From 21754a009cada77b9c8613354249b220677af478 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 27 Jul 2024 18:03:16 +0800 Subject: [PATCH 1/5] enh: add more check return --- include/common/tmsg.h | 9 ++- source/common/src/tdataformat.c | 6 +- source/dnode/vnode/src/meta/metaQuery.c | 30 +++++--- source/dnode/vnode/src/meta/metaTable.c | 14 ++-- source/dnode/vnode/src/tsdb/tsdbFS2.c | 6 +- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 87 ++++++++++++++-------- source/dnode/vnode/src/vnd/vnodeSvr.c | 41 ++++++++-- source/libs/sync/src/syncRespMgr.c | 4 +- source/libs/tdb/src/db/tdbBtree.c | 15 +--- source/util/src/tarray.c | 2 - source/util/src/tcache.c | 2 +- source/util/src/tlrucache.c | 8 +- 12 files changed, 144 insertions(+), 80 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4830593616..75a67ea484 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2899,7 +2899,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) { } pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t)); if (pRebInfo->newConsumers == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } return pRebInfo; @@ -3455,7 +3454,9 @@ static FORCE_INLINE void* taosDecodeSMqTopicInfoMsg(void* buf, SMqTopicInfo* pTo buf = taosDecodeStringTo(buf, pTopicInfo->name); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); - pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqReportVgInfo)); + if ((pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqReportVgInfo))) == NULL) { + return NULL; + } for (int32_t i = 0; i < sz; i++) { SMqReportVgInfo vgInfo; buf = taosDecodeSMqVgInfo(buf, &vgInfo); @@ -3493,7 +3494,9 @@ static FORCE_INLINE void* taosDecodeSMqReportMsg(void* buf, SMqReportReq* pMsg) buf = taosDecodeFixedI64(buf, &pMsg->consumerId); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); - pMsg->pTopics = taosArrayInit(sz, sizeof(SMqTopicInfo)); + if ((pMsg->pTopics = taosArrayInit(sz, sizeof(SMqTopicInfo))) == NULL) { + return NULL; + } for (int32_t i = 0; i < sz; i++) { SMqTopicInfo topicInfo; buf = taosDecodeSMqTopicInfoMsg(buf, &topicInfo); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index dabdc630f5..371fc130f0 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -459,7 +459,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, SColVal colVal; if ((colValArray = taosArrayInit(numOfInfos, sizeof(SColVal))) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } for (int32_t iRow = 0; iRow < numOfRows; iRow++) { @@ -670,7 +670,7 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, // merge aColVal = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); if (aColVal == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _exit; } @@ -1748,7 +1748,7 @@ int32_t tTagToValArray(const STag *pTag, SArray **ppArray) { (*ppArray) = taosArrayInit(pTag->nTag + 1, sizeof(STagVal)); if (*ppArray == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _err; } diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index e4a04047f1..6640f4aa07 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1070,13 +1070,15 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) { int32_t cmp = (*param->filterFunc)((void *)&p->btime, (void *)&pBtimeKey->btime, param->type); if (cmp == 0) - taosArrayPush(pUids, &p->uid); - else { - if (param->equal == true) { - if (count > TRY_ERROR_LIMIT) break; - count++; + if (taosArrayPush(pUids, &p->uid) == NULL) { + ret = terrno; + break; + } else { + if (param->equal == true) { + if (count > TRY_ERROR_LIMIT) break; + count++; + } } - } valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); if (valid < 0) break; } while (1); @@ -1132,7 +1134,10 @@ int32_t metaFilterTableName(void *pVnode, SMetaFltParam *arg, SArray *pUids) { cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type); if (cmp == 0) { tb_uid_t tuid = *(tb_uid_t *)pEntryVal; - taosArrayPush(pUids, &tuid); + if (taosArrayPush(pUids, &tuid) == NULL) { + ret = terrno; + goto END; + } } else { if (param->equal == true) { if (count > TRY_ERROR_LIMIT) break; @@ -1328,7 +1333,10 @@ int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) { } else { tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes); } - taosArrayPush(pUids, &tuid); + if (taosArrayPush(pUids, &tuid) == NULL) { + ret = terrno; + break; + } found = true; } else { if (param->equal == true) { @@ -1432,7 +1440,11 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) { STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; info.pTagVal = taosMemoryMalloc(pCur->vLen); memcpy(info.pTagVal, pCur->pVal, pCur->vLen); - taosArrayPush(pUidTagInfo, &info); + if (taosArrayPush(pUidTagInfo, &info) == NULL) { + metaCloseCtbCursor(pCur); + taosHashCleanup(pSepecifiedUidMap); + return TSDB_CODE_OUT_OF_MEMORY; + } } } else { // only the specified tables need to be added while (1) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 52151e76d6..04447fd3ce 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -341,7 +341,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb break; } - taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid)); + (void)taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid)); } tdbTbcClose(pCtbIdxc); @@ -405,7 +405,7 @@ static void metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) { break; } - taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid)); + (void)taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid)); } tdbFree(pKey); @@ -1033,7 +1033,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi } if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) { - taosArrayPush(tbUids, &uid); + (void)taosArrayPush(tbUids, &uid); if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL); @@ -1135,7 +1135,7 @@ static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) { tbFName[TSDB_TABLE_FNAME_LEN] = '\0'; int32_t ret = vnodeValidateTableHash(pMeta->pVnode, tbFName); if (ret < 0 && terrno == TSDB_CODE_VND_HASH_MISMATCH) { - taosArrayPush(uidList, &me.uid); + (void)taosArrayPush(uidList, &me.uid); } } tDecoderClear(&dc); @@ -1783,11 +1783,11 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA } else { memcpy(&val.i64, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal); } - taosArrayPush(pTagArray, &val); + (void)taosArrayPush(pTagArray, &val); } else { STagVal val = {.cid = pCol->colId}; if (tTagGet(pOldTag, &val)) { - taosArrayPush(pTagArray, &val); + (void)taosArrayPush(pTagArray, &val); } } } @@ -2171,7 +2171,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT } SMetaPair pair = {.key = pKey, nKey = nKey}; - taosArrayPush(tagIdxList, &pair); + (void)taosArrayPush(tagIdxList, &pair); } tdbTbcClose(pTagIdxc); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 68954e5156..3b0a40c475 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -770,7 +770,11 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { STFileSet *fset; TARRAY2_FOREACH(fs->fSetArr, fset) { if (fset->channelOpened) { - taosArrayPush(channelArray, &fset->channel); + if (taosArrayPush(channelArray, &fset->channel) == NULL) { + taosArrayDestroy(channelArray); + taosThreadMutexUnlock(&pTsdb->mutex); + return terrno; + } fset->channel = (SVAChannelID){0}; fset->mergeScheduled = false; tsdbFSSetBlockCommit(fset, false); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 64569e63ac..0cb6a152b0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -88,6 +88,9 @@ int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket; if (pBuf->pData == NULL) { pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); + if (pBuf->pData == NULL) { + return terrno; + } } for (int32_t i = 0; i < num; ++i) { @@ -163,22 +166,22 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in if (IS_NUMERIC_TYPE(type)) { if (asc) { - switch(type) { + switch (type) { case TSDB_DATA_TYPE_BIGINT: { pKey->pks[0].val = INT64_MIN; break; } - case TSDB_DATA_TYPE_INT:{ + case TSDB_DATA_TYPE_INT: { int32_t min = INT32_MIN; (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; } - case TSDB_DATA_TYPE_SMALLINT:{ + case TSDB_DATA_TYPE_SMALLINT: { int16_t min = INT16_MIN; (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; } - case TSDB_DATA_TYPE_TINYINT:{ + case TSDB_DATA_TYPE_TINYINT: { int8_t min = INT8_MIN; (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; @@ -194,15 +197,31 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in ASSERT(0); } } else { - switch(type) { - case TSDB_DATA_TYPE_BIGINT:pKey->pks[0].val = INT64_MAX;break; - case TSDB_DATA_TYPE_INT:pKey->pks[0].val = INT32_MAX;break; - case TSDB_DATA_TYPE_SMALLINT:pKey->pks[0].val = INT16_MAX;break; - case TSDB_DATA_TYPE_TINYINT:pKey->pks[0].val = INT8_MAX;break; - case TSDB_DATA_TYPE_UBIGINT:pKey->pks[0].val = UINT64_MAX;break; - case TSDB_DATA_TYPE_UINT:pKey->pks[0].val = UINT32_MAX;break; - case TSDB_DATA_TYPE_USMALLINT:pKey->pks[0].val = UINT16_MAX;break; - case TSDB_DATA_TYPE_UTINYINT:pKey->pks[0].val = UINT8_MAX;break; + switch (type) { + case TSDB_DATA_TYPE_BIGINT: + pKey->pks[0].val = INT64_MAX; + break; + case TSDB_DATA_TYPE_INT: + pKey->pks[0].val = INT32_MAX; + break; + case TSDB_DATA_TYPE_SMALLINT: + pKey->pks[0].val = INT16_MAX; + break; + case TSDB_DATA_TYPE_TINYINT: + pKey->pks[0].val = INT8_MAX; + break; + case TSDB_DATA_TYPE_UBIGINT: + pKey->pks[0].val = UINT64_MAX; + break; + case TSDB_DATA_TYPE_UINT: + pKey->pks[0].val = UINT32_MAX; + break; + case TSDB_DATA_TYPE_USMALLINT: + pKey->pks[0].val = UINT16_MAX; + break; + case TSDB_DATA_TYPE_UTINYINT: + pKey->pks[0].val = UINT8_MAX; + break; default: ASSERT(0); } @@ -232,7 +251,7 @@ void clearRowKey(SRowKey* pKey) { taosMemoryFreeClear(pKey->pks[0].pData); } -static int32_t initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { +static int32_t initLastProcKey(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { int32_t code = 0; int32_t numOfPks = pReader->suppInfo.numOfPks; bool asc = ASCENDING_TRAVERSE(pReader->info.order); @@ -448,8 +467,8 @@ void cleanupInfoForNextFileset(SSHashObj* pTableMap) { // brin records iterator void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) { - (void) memset(&pIter->block, 0, sizeof(SBrinBlock)); - (void) memset(&pIter->record, 0, sizeof(SBrinRecord)); + (void)memset(&pIter->block, 0, sizeof(SBrinBlock)); + (void)memset(&pIter->record, 0, sizeof(SBrinRecord)); pIter->blockIndex = -1; pIter->recordIndex = -1; @@ -471,7 +490,7 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) { return TSDB_CODE_INVALID_PARA; } - (void) tBrinBlockClear(&pIter->block); + (void)tBrinBlockClear(&pIter->block); int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block); if (code != TSDB_CODE_SUCCESS) { tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code)); @@ -488,7 +507,7 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) { return code; } -void clearBrinBlockIter(SBrinRecordIter* pIter) { (void) tBrinBlockDestroy(&pIter->block); } +void clearBrinBlockIter(SBrinRecordIter* pIter) { (void)tBrinBlockDestroy(&pIter->block); } // initialize the file block access order // sort the file blocks according to the offset of each data block in the files @@ -658,7 +677,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); for (int32_t i = 0; i < numOfBlocks; ++i) { STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; - void* px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + void* px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -774,6 +793,9 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ if (pScanInfo->pFileDelData == NULL) { pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); + if (pScanInfo->pFileDelData == NULL) { + return terrno; + } } for (int32_t k = 0; k < pBlock->numOfRecords; ++k) { @@ -810,6 +832,9 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ if (pScanInfo->pFileDelData == NULL) { pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); + if (pScanInfo->pFileDelData == NULL) { + return terrno; + } } } @@ -821,7 +846,7 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ if (record.version <= pReader->info.verRange.maxVer) { SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; - void* px = taosArrayPush(pScanInfo->pFileDelData, &delData); + void* px = taosArrayPush(pScanInfo->pFileDelData, &delData); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -878,7 +903,7 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs ETombBlkCheckEnum ret = 0; code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret); - (void) tTombBlockDestroy(&block); + (void)tTombBlockDestroy(&block); if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) { return code; } @@ -977,7 +1002,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo SStatisBlk* p = &pStatisBlkArray->data[i]; STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); - (void) tStatisBlockInit(pStatisBlock); + (void)tStatisBlockInit(pStatisBlock); int64_t st = taosGetTimestampMs(); int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock); @@ -995,7 +1020,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } if (index >= pStatisBlock->numOfRecords) { - (void) tStatisBlockDestroy(pStatisBlock); + (void)tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -1005,7 +1030,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) { p = &pStatisBlkArray->data[i]; if (p->minTbid.suid > suid) { - (void) tStatisBlockDestroy(pStatisBlock); + (void)tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -1025,7 +1050,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } } - (void) tStatisBlockDestroy(pStatisBlock); + (void)tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -1037,7 +1062,7 @@ static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlo (*i) += 1; (*j) = 0; if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) { - (void) tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); + (void)tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); } } } @@ -1049,7 +1074,7 @@ int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { int32_t inc = numOfFileObj - size; for (int32_t k = 0; k < inc; ++k) { SLDataIter* pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); - void* px = taosArrayPush(pLDIterList, &pIter); + void* px = taosArrayPush(pLDIterList, &pIter); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -1073,6 +1098,9 @@ int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) // add the list/iter placeholder while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) { SArray* pList = taosArrayInit(4, POINTER_BYTES); + if (pList == NULL) { + return terrno; + } void* px = taosArrayPush(pSttFileBlockIterArray, &pList); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1210,8 +1238,7 @@ static int32_t sortUidComparFn(const void* p1, const void* p2) { return ret; } -bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, - int32_t order) { +bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order) { // check if it overlap with del skyline taosArraySort(pKeyRangeList, sortUidComparFn); @@ -1242,7 +1269,7 @@ bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlo } STimeWindow w2 = {.skey = p2->skey.ts, .ekey = p2->ekey.ts}; - bool overlap = overlapWithTimeWindow(&w2, pQueryWindow, pScanInfo, order); + bool overlap = overlapWithTimeWindow(&w2, pQueryWindow, pScanInfo, order); if (overlap) { return false; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 45b2abeb24..1b656442a5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1124,7 +1124,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name); if (vnodeValidateTableHash(pVnode, tbName) < 0) { cRsp.code = TSDB_CODE_VND_HASH_MISMATCH; - taosArrayPush(rsp.pArray, &cRsp); + if (taosArrayPush(rsp.pArray, &cRsp) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + rcode = -1; + goto _exit; + } vError("vgId:%d create-table:%s failed due to hash value mismatch", TD_VID(pVnode), tbName); continue; } @@ -1139,11 +1143,19 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, } else { cRsp.code = TSDB_CODE_SUCCESS; tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); - taosArrayPush(tbUids, &pCreateReq->uid); + if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + rcode = -1; + goto _exit; + } vnodeUpdateMetaRsp(pVnode, cRsp.pMeta); } - taosArrayPush(rsp.pArray, &cRsp); + if (taosArrayPush(rsp.pArray, &cRsp) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + rcode = -1; + goto _exit; + } } vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids)); @@ -1375,12 +1387,20 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid); } - taosArrayPush(rsp.pArray, &dropTbRsp); + if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pRsp->code = terrno; + goto _exit; + } if (tsEnableAuditCreateTable) { char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); strcpy(str, pDropTbReq->name); - taosArrayPush(tbNames, &str); + if (taosArrayPush(tbNames, &str) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pRsp->code = terrno; + goto _exit; + } } } @@ -1499,11 +1519,13 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) { taosArrayDestroy(pCxt->pColValues); pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal)); if (NULL == pCxt->pColValues) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) { SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type); - taosArrayPush(pCxt->pColValues, &val); + if (taosArrayPush(pCxt->pColValues, &val) == NULL) { + return terrno; + } } return TSDB_CODE_SUCCESS; @@ -1819,7 +1841,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in goto _exit; } - taosArrayPush(newTbUids, &pSubmitTbData->uid); + if (taosArrayPush(newTbUids, &pSubmitTbData->uid) == NULL) { + code = terrno; + goto _exit; + } if (pCreateTbRsp->pMeta) { vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 4663a1f6e9..aa7a6da0a2 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -142,7 +142,9 @@ static int32_t syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { int64_t nowMS = taosGetTimestampMs(); if (nowMS - pStub->createTime > ttl || -1 == ttl) { - taosArrayPush(delIndexArray, pSeqNum); + if (taosArrayPush(delIndexArray, pSeqNum) == NULL) { + return terrno; + } cnt++; SFsmCbMeta cbMeta = { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 187d65d975..f86ed69fc3 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1366,11 +1366,6 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, if (ret < 0) { return ret; } - /* - if (pDecoder->ofps) { - taosArrayPush(pDecoder->ofps, &ofp); - } - */ ofpCell = tdbPageGetCell(ofp, 0); if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { @@ -1411,11 +1406,6 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, if (ret < 0) { return ret; } - /* - if (pDecoder->ofps) { - taosArrayPush(pDecoder->ofps, &ofp); - } - */ ofpCell = tdbPageGetCell(ofp, 0); int lastKeyPage = 0; @@ -1642,7 +1632,10 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * SArray *ofps = pPage->pPager->ofps; if (ofps) { - taosArrayPush(ofps, &ofp); + if (taosArrayPush(ofps, &ofp) == NULL) { + ASSERT(0); + return terrno; + } } tdbPagerReturnPage(pPage->pPager, ofp, pTxn); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 49132037d4..660b757bb2 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -34,14 +34,12 @@ SArray* taosArrayInit(size_t size, size_t elemSize) { SArray* pArray = taosMemoryMalloc(sizeof(SArray)); if (pArray == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pArray->size = 0; pArray->pData = taosMemoryCalloc(size, elemSize); if (pArray->pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pArray); return NULL; } diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index c0ed0b98d0..17b985b5e0 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -169,7 +169,7 @@ TdThread doRegisterCacheObj(SCacheObj *pCacheObj) { taosThreadOnce(&cacheThreadInit, doInitRefreshThread); taosThreadMutexLock(&guard); - taosArrayPush(pCacheArrayList, &pCacheObj); + (void)taosArrayPush(pCacheArrayList, &pCacheObj); taosThreadMutexUnlock(&guard); return cacheRefreshWorker; diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 7e165a12d5..e08dc009fc 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -326,7 +326,7 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; - taosArrayPush(deleted, &old); + (void)taosArrayPush(deleted, &old); } } @@ -392,7 +392,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) { TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (handle == NULL) { - taosArrayPush(lastReferenceList, &e); + (void)taosArrayPush(lastReferenceList, &e); } else { if (freeOnFail) { taosMemoryFree(e); @@ -415,7 +415,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; - taosArrayPush(lastReferenceList, &old); + (void)taosArrayPush(lastReferenceList, &old); } } if (handle == NULL) { @@ -536,7 +536,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) { ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; - taosArrayPush(lastReferenceList, &old); + (void)taosArrayPush(lastReferenceList, &old); } (void)taosThreadMutexUnlock(&shard->mutex); From 911e6380b937b4eac381e3a747adbaf133b84df9 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sat, 27 Jul 2024 18:33:08 +0800 Subject: [PATCH 2/5] return value: udf --- include/libs/function/taosudf.h | 24 +- include/libs/function/tudf.h | 23 +- include/os/osDir.h | 2 + include/os/osSysinfo.h | 2 +- include/util/taoserror.h | 1 + include/util/tutil.h | 2 - source/libs/function/inc/tudfInt.h | 2 +- source/libs/function/src/tudf.c | 504 +++++++++++++++++-------- source/libs/function/src/udfd.c | 546 +++++++++++++++++++-------- source/libs/function/test/runUdf.c | 118 ++++-- source/libs/function/test/udf1.c | 11 +- source/libs/function/test/udf1_dup.c | 11 +- source/os/src/osSysinfo.c | 8 +- source/util/src/terror.c | 1 + 14 files changed, 895 insertions(+), 360 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 79abbc4e68..04b92a897a 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -33,6 +33,15 @@ extern "C" { #else #define FORCE_INLINE #endif + +#define TAOS_UDF_CHECK_RETURN(CMD) \ + do { \ + int32_t code = (CMD); \ + if (code != TSDB_CODE_SUCCESS) { \ + return (CMD); \ + } \ + } while (0) + typedef struct SUdfColumnMeta { int16_t type; int32_t bytes; @@ -192,25 +201,28 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void udfColDataSetNull(SUdfColumn *pColumn, int32_t row) { - udfColEnsureCapacity(pColumn, row + 1); +static FORCE_INLINE int32_t udfColDataSetNull(SUdfColumn *pColumn, int32_t row) { + int32_t code = udfColEnsureCapacity(pColumn, row + 1); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) { udfColDataSetNull_var(pColumn, row); } else { udfColDataSetNull_f(pColumn, row); } pColumn->hasNull = true; - pColumn->colData.numOfRows = - ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows; + pColumn->colData.numOfRows = ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows; + return 0; } static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentRow, const char *pData, bool isNull) { SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnData *data = &pColumn->colData; - udfColEnsureCapacity(pColumn, currentRow + 1); + TAOS_UDF_CHECK_RETURN(udfColEnsureCapacity(pColumn, currentRow + 1)); bool isVarCol = IS_VAR_DATA_TYPE(meta->type); if (isNull) { - udfColDataSetNull(pColumn, currentRow); + TAOS_UDF_CHECK_RETURN(udfColDataSetNull(pColumn, currentRow)); } else { if (!isVarCol) { udfColDataSetNotNull_f(pColumn, currentRow); diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 7a8927ca90..acdbc09be6 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -43,6 +43,25 @@ extern "C" { #endif #define UDF_DNODE_ID_ENV_NAME "DNODE_ID" +#define TAOS_UV_LIB_ERROR_RET(ret) \ + do { \ + if (0 != ret) { \ + terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE; \ + return TSDB_CODE_UDF_UV_EXEC_FAILURE; \ + } \ + } while(0) + + +#define TAOS_UV_CHECK_ERRNO(CODE) \ + do { \ + if (0 != CODE) { \ + terrln = __LINE__; \ + terrno = (CODE); \ + goto _exit; \ + } \ + } while (0) + + // low level APIs /** * setup udf @@ -109,13 +128,13 @@ int32_t udfStartUdfd(int32_t startDnodeId); * stop udfd * @return */ -int32_t udfStopUdfd(); +void udfStopUdfd(); /** * get udfd pid * */ - int32_t udfGetUdfdPid(int32_t* pUdfdPid); +// int32_t udfGetUdfdPid(int32_t* pUdfdPid); #ifdef __cplusplus } diff --git a/include/os/osDir.h b/include/os/osDir.h index 533ac8e4a4..e660ac5853 100644 --- a/include/os/osDir.h +++ b/include/os/osDir.h @@ -79,6 +79,8 @@ extern "C" { typedef struct TdDir *TdDirPtr; typedef struct TdDirEntry *TdDirEntryPtr; +#define TAOS_DIRNAME(name) ((void)taosDirName(name)) + void taosRemoveDir(const char *dirname); bool taosDirExist(const char *dirname); int32_t taosMkDir(const char *dirname); diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 7a1df2b81c..5a76be1d1e 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -39,7 +39,7 @@ int64_t taosGetOsUptime(); int32_t taosGetEmail(char *email, int32_t maxLen); int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t maxLen); int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores); -int32_t taosGetCpuCores(float *numOfCores, bool physical); +void taosGetCpuCores(float *numOfCores, bool physical); void taosGetCpuUsage(double *cpu_system, double *cpu_engine); int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512); int32_t taosGetTotalMemory(int64_t *totalKB); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9b49c1908d..6ad643f8d5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -877,6 +877,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x2908) #define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2909) #define TSDB_CODE_UDF_FUNC_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290A) +#define TSDB_CODE_UDF_UV_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290B) // sml #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) diff --git a/include/util/tutil.h b/include/util/tutil.h index 72c4f90fd5..ca75461108 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -16,12 +16,10 @@ #ifndef _TD_UTIL_UTIL_H_ #define _TD_UTIL_UTIL_H_ -#include "os.h" #include "tcrc32c.h" #include "tdef.h" #include "thash.h" #include "tmd5.h" -#include "tutil.h" #ifdef __cplusplus extern "C" { diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 27d3b7930f..3dfe8dcab8 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -109,7 +109,7 @@ void freeUdfDataDataBlock(SUdfDataBlock *block); int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); -int32_t getUdfdPipeName(char *pipeName, int32_t size); +void getUdfdPipeName(char *pipeName, int32_t size); #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c9b8a1e08b..ebb93695fd 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -24,6 +24,7 @@ #include "tdatablock.h" #include "tglobal.h" #include "tudf.h" +#include #include "tudfInt.h" #ifdef _TD_DARWIN_64 @@ -51,11 +52,10 @@ typedef struct SUdfdData { SUdfdData udfdGlobal = {0}; int32_t udfStartUdfd(int32_t startDnodeId); -int32_t udfStopUdfd(); +void udfStopUdfd(); static int32_t udfSpawnUdfd(SUdfdData *pData); void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal); -static int32_t udfSpawnUdfd(SUdfdData *pData); static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg); static void udfUdfdStopAsyncCb(uv_async_t *async); static void udfWatchUdfd(void *args); @@ -67,7 +67,10 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop"); } else { fnInfo("udfd process restart"); - udfSpawnUdfd(pData); + int32_t code = udfSpawnUdfd(pData); + if(code != 0) { + fnError("udfd process restart failed with code:%d", code); + } } } @@ -80,26 +83,26 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { path[0] = '.'; #ifdef WINDOWS GetModuleFileName(NULL, path, PATH_MAX); - taosDirName(path); + TAOS_DIRNAME(path); #elif defined(_TD_DARWIN_64) uint32_t pathSize = sizeof(path); _NSGetExecutablePath(path, &pathSize); - taosDirName(path); + TAOS_DIRNAME(path); #endif } else { - strncpy(path, tsProcPath, PATH_MAX); - taosDirName(path); + TAOS_STRNCPY(path, tsProcPath, PATH_MAX); + TAOS_DIRNAME(path); } #ifdef WINDOWS if (strlen(path) == 0) { - strcat(path, "C:\\TDengine"); + TAOS_STRCAT(path, "C:\\TDengine"); } - strcat(path, "\\udfd.exe"); + TAOS_STRCAT(path, "\\udfd.exe"); #else if (strlen(path) == 0) { - strcat(path, "/usr/bin"); + TAOS_STRCAT(path, "/usr/bin"); } - strcat(path, "/udfd"); + TAOS_STRCAT(path, "/udfd"); #endif char *argsUdfd[] = {path, "-c", configDir, NULL}; options.args = argsUdfd; @@ -107,7 +110,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { options.exit_cb = udfUdfdExit; - uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); + TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1)); uv_stdio_container_t child_stdio[3]; child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; @@ -156,7 +159,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { taosFqdnEnvItem = taosMemoryMalloc(strlen("TAOS_FQDN=") + strlen(taosFqdn) + 1); if (taosFqdnEnvItem != NULL) { strcpy(taosFqdnEnvItem, "TAOS_FQDN="); - strcat(taosFqdnEnvItem, taosFqdn); + TAOS_STRCAT(taosFqdnEnvItem, taosFqdn); fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn); } else { fnError("[UDFD]Failed to allocate memory for TAOS_FQDN"); @@ -212,22 +215,37 @@ static void udfUdfdStopAsyncCb(uv_async_t *async) { static void udfWatchUdfd(void *args) { SUdfdData *pData = args; - uv_loop_init(&pData->loop); - uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb); + TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop)); + TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb)); pData->stopAsync.data = pData; - int32_t err = udfSpawnUdfd(pData); - atomic_store_32(&pData->spawnErr, err); - uv_barrier_wait(&pData->barrier); - uv_run(&pData->loop, UV_RUN_DEFAULT); - uv_loop_close(&pData->loop); + TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData)); + atomic_store_32(&pData->spawnErr, 0); + (void)uv_barrier_wait(&pData->barrier); + int num = uv_run(&pData->loop, UV_RUN_DEFAULT); + fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__); uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL); - uv_run(&pData->loop, UV_RUN_DEFAULT); - uv_loop_close(&pData->loop); + num = uv_run(&pData->loop, UV_RUN_DEFAULT); + fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__); + if(uv_loop_close(&pData->loop) != 0) { + fnError("udfd loop close failed, lino:%d", __LINE__); + } + +_exit: + if (terrno != 0) { + (void)uv_barrier_wait(&pData->barrier); + atomic_store_32(&pData->spawnErr, terrno); + if(uv_loop_close(&pData->loop) != 0) { + fnError("udfd loop close failed, lino:%d", __LINE__); + } + fnError("udfd thread exit with code:%d lino:%d", terrno, terrln); + terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE; + } return; } int32_t udfStartUdfd(int32_t startDnodeId) { + int32_t code = 0, lino = 0; if (!tsStartUdfd) { fnInfo("start udfd is disabled.") return 0; } @@ -239,43 +257,58 @@ int32_t udfStartUdfd(int32_t startDnodeId) { pData->startCalled = true; char dnodeId[8] = {0}; snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId); - uv_os_setenv("DNODE_ID", dnodeId); + TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit); pData->dnodeId = startDnodeId; - uv_barrier_init(&pData->barrier, 2); - uv_thread_create(&pData->thread, udfWatchUdfd, pData); - uv_barrier_wait(&pData->barrier); + TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit); + TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit); + (void)uv_barrier_wait(&pData->barrier); int32_t err = atomic_load_32(&pData->spawnErr); if (err != 0) { uv_barrier_destroy(&pData->barrier); - uv_async_send(&pData->stopAsync); - uv_thread_join(&pData->thread); + if(uv_async_send(&pData->stopAsync) != 0) { + fnError("start udfd: failed to send stop async"); + } + if(uv_thread_join(&pData->thread)!= 0) { + fnError("start udfd: failed to join udfd thread"); + } pData->needCleanUp = false; fnInfo("udfd is cleaned up after spawn err"); + TAOS_CHECK_GOTO(err, &lino, _exit); } else { pData->needCleanUp = true; } - return err; +_exit: + if (code != 0) { + fnError("udfd start failed with code:%d, lino:%d", code, lino); + } + return code; } -int32_t udfStopUdfd() { +void udfStopUdfd() { SUdfdData *pData = &udfdGlobal; fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr); if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { - return 0; + return; } atomic_store_32(&pData->stopCalled, 1); pData->needCleanUp = false; uv_barrier_destroy(&pData->barrier); - uv_async_send(&pData->stopAsync); - uv_thread_join(&pData->thread); + if(uv_async_send(&pData->stopAsync) != 0) { + fnError("stop udfd: failed to send stop async"); + } + if(uv_thread_join(&pData->thread) != 0) { + fnError("stop udfd: failed to join udfd thread"); + } + #ifdef WINDOWS if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle); #endif fnInfo("udfd is cleaned up"); - return 0; + return; } +/* int32_t udfGetUdfdPid(int32_t* pUdfdPid) { SUdfdData *pData = &udfdGlobal; if (pData->spawnErr) { @@ -287,6 +320,7 @@ int32_t udfGetUdfdPid(int32_t* pUdfdPid) { } return TSDB_CODE_SUCCESS; } +*/ //============================================================================================== /* Copyright (c) 2013, Ben Noordhuis @@ -439,8 +473,6 @@ typedef struct SClientUdfTask { SUdfcUvSession *session; - int32_t errCode; - union { struct { SUdfSetupRequest req; @@ -479,7 +511,7 @@ enum { UDFC_STATE_STOPPING, // stopping after udfcClose }; -int32_t getUdfdPipeName(char *pipeName, int32_t size); +void getUdfdPipeName(char *pipeName, int32_t size); int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup); void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request); int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state); @@ -507,7 +539,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output); int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output); -int32_t getUdfdPipeName(char *pipeName, int32_t size) { +void getUdfdPipeName(char *pipeName, int32_t size) { char dnodeId[8] = {0}; size_t dnodeIdSize = sizeof(dnodeId); int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize); @@ -522,7 +554,6 @@ int32_t getUdfdPipeName(char *pipeName, int32_t size) { snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); #endif fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName); - return 0; } int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { @@ -712,16 +743,14 @@ void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownR int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) { int32_t len = 0; - if (buf == NULL) { - len += sizeof(rsp->msgLen); - } else { + len += sizeof(rsp->msgLen); + if (buf != NULL) { *(int32_t *)(*buf) = rsp->msgLen; *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen)); } - if (buf == NULL) { - len += sizeof(rsp->seqNum); - } else { + len += sizeof(rsp->seqNum); + if (buf != NULL) { *(int64_t *)(*buf) = rsp->seqNum; *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum)); } @@ -810,6 +839,9 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *)); for (int32_t i = 0; i < udfBlock->numOfCols; ++i) { udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn)); + if(udfBlock->udfCols[i] == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i); SUdfColumn *udfCol = udfBlock->udfCols[i]; udfCol->colMeta.type = col->info.type; @@ -821,9 +853,15 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); + if(udfCol->colData.varLenCol.varOffsets == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows); udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen); + if(udfCol->colData.varLenCol.payload == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } if (col->reassigned) { for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) { char* pColData = col->pData + col->varmeta.offset[row]; @@ -843,6 +881,9 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows); int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen; udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen); + if(udfCol->colData.fixLenCol.nullBitmap == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } char *bitmap = udfCol->colData.fixLenCol.nullBitmap; memcpy(bitmap, col->nullbitmap, bitmapLen); udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows); @@ -852,15 +893,18 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo memcpy(data, col->pData, dataLen); } } - return 0; + return TSDB_CODE_SUCCESS; } int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { + int32_t code = 0, lino = 0; SUdfColumnMeta* meta = &udfCol->colMeta; SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1); - blockDataAppendColInfo(block, &colInfoData); - blockDataEnsureCapacity(block, udfCol->colData.numOfRows); + code = blockDataAppendColInfo(block, &colInfoData); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows); + TAOS_CHECK_GOTO(code, &lino, _exit); SColumnInfoData *col = bdGetColumnInfoData(block, 0); for (int i = 0; i < udfCol->colData.numOfRows; ++i) { @@ -868,43 +912,20 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { colDataSetNULL(col, i); } else { char* data = udfColDataGetData(udfCol, i); - colDataSetVal(col, i, data, false); + code = colDataSetVal(col, i, data, false); + TAOS_CHECK_GOTO(code, &lino, _exit); } } block->info.rows = udfCol->colData.numOfRows; - return 0; -} - -int32_t convertUdfColumnToDataBlock2(SUdfColumn *udfCol, SSDataBlock *block) { - block->info.rows = udfCol->colData.numOfRows; - block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type); - - block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); - taosArrayPush(block->pDataBlock, &(SColumnInfoData){0}); - SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0); - SUdfColumnMeta *meta = &udfCol->colMeta; - col->info.precision = meta->precision; - col->info.bytes = meta->bytes; - col->info.scale = meta->scale; - col->info.type = meta->type; - col->hasNull = udfCol->hasNull; - SUdfColumnData *data = &udfCol->colData; - - if (!IS_VAR_DATA_TYPE(meta->type)) { - col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen); - memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen); - col->pData = taosMemoryMalloc(data->fixLenCol.dataLen); - memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen); - } else { - col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen); - memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen); - col->pData = taosMemoryMalloc(data->varLenCol.payloadLen); - memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen); +_exit: + if (code != 0) { + fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino); } - return 0; + return TSDB_CODE_SUCCESS; } int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) { + int32_t code = 0, lino = 0; int32_t numOfRows = 0; for (int32_t i = 0; i < numOfCols; ++i) { numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows; @@ -916,16 +937,16 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS SColumnInfoData d = {0}; d.info = pInfo->info; - blockDataAppendColInfo(output, &d); + TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit); } - blockDataEnsureCapacity(output, numOfRows); + TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit); for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDest = taosArrayGet(output->pDataBlock, i); SColumnInfoData* pColInfoData = input[i].columnData; - colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info); + TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit); if (input[i].numOfRows < numOfRows) { int32_t startRow = input[i].numOfRows; @@ -936,26 +957,31 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS } else { char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1); for (int j = 0; j < expandRows; ++j) { - colDataSetVal(pDest, startRow+j, src, false); + TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow+j, src, false), &lino, _exit); } - //colDataSetNItems(pColInfoData, startRow, data, expandRows); } } } output->info.rows = numOfRows; - - return 0; +_exit: + if (code != 0) { + fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino); + } + return code; } int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { if (taosArrayGetSize(input->pDataBlock) != 1) { fnError("scalar function only support one column"); - return -1; + return 0; } output->numOfRows = input->info.rows; output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData)); + if(output->columnData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData)); output->colAlloced = true; @@ -1024,7 +1050,7 @@ int compareUdfcFuncSub(const void *elem1, const void *elem2) { } int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { - int32_t code = 0; + int32_t code = 0, line = 0; uv_mutex_lock(&gUdfcProxy.udfStubsMutex); SUdfcFuncStub key = {0}; strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); @@ -1048,7 +1074,10 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { } else { fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); - taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub); + if(taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) { + fnError("acquireUdfFuncHandle: failed to push udf stub to array"); + goto _exit; + } taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub); } } @@ -1060,12 +1089,16 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { stub.handle = *pHandle; ++stub.refCount; stub.createTime = taosGetTimestampUs(); - taosArrayPush(gUdfcProxy.udfStubs, &stub); + if(taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) { + fnError("acquireUdfFuncHandle: failed to push udf stub to array"); + goto _exit; + } taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub); } else { *pHandle = NULL; } +_exit: uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return code; } @@ -1092,17 +1125,23 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { void cleanupExpiredUdfs() { int32_t i = 0; SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + if(expiredUdfStubs == NULL) { + fnError("cleanupExpiredUdfs: failed to init array"); + return; + } while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); if (stub->refCount == 0) { fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); - doTeardownUdf(stub->handle); + (void)doTeardownUdf(stub->handle); } else { fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, stub->refCount, stub->createTime, stub->handle); UdfcFuncHandle handle = stub->handle; if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(expiredUdfStubs, stub); + if(taosArrayPush(expiredUdfStubs, stub) == NULL) { + fnError("cleanupExpiredUdfs: failed to push udf stub to array"); + } } else { fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName, stub->refCount, stub->createTime); @@ -1121,16 +1160,18 @@ void cleanupNotExpiredUdfs() { SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i); if (stub->refCount == 0) { fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); - doTeardownUdf(stub->handle); + (void)doTeardownUdf(stub->handle); } else { fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, stub->refCount, stub->createTime, stub->handle); UdfcFuncHandle handle = stub->handle; if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(udfStubs, stub); + if (taosArrayPush(udfStubs, stub) == NULL) { + fnError("cleanupNotExpiredUdfs: failed to push udf stub to array"); + } } else { - fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", - stub->udfName, stub->refCount, stub->createTime); + fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName, + stub->refCount, stub->createTime); } } ++i; @@ -1259,7 +1300,11 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.id.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { - blockDataAppendColInfo(pTempBlock, pInput->pData[i]); + if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) { + fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode); + blockDataDestroy(pTempBlock); + return udfCode; + } } SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows); @@ -1358,12 +1403,16 @@ void onUdfcPipeClose(uv_handle_t *handle) { } int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) { + int32_t code = 0; fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask); if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->rspBuf.base != NULL) { SUdfResponse rsp = {0}; void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); - task->errCode = rsp.code; + code = rsp.code; + if(code != 0) { + fnError("udfc get udf task result failure. code: %d", code); + } switch (task->type) { case UDF_TASK_SETUP: { @@ -1386,14 +1435,23 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * // TODO: the call buffer is setup and freed by udf invocation taosMemoryFree(uvTask->rspBuf.base); } else { - task->errCode = uvTask->errCode; + code = uvTask->errCode; + if(code != 0) { + fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); + } } } else if (uvTask->type == UV_TASK_CONNECT) { - task->errCode = uvTask->errCode; + code = uvTask->errCode; + if(code != 0) { + fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); + } } else if (uvTask->type == UV_TASK_DISCONNECT) { - task->errCode = uvTask->errCode; + code = uvTask->errCode; + if(code != 0) { + fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); + } } - return 0; + return code; } void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { @@ -1542,7 +1600,11 @@ void onUdfcPipeConnect(uv_connect_t *connect, int status) { } uvTask->errCode = status; - uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead); + int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead); + if(code != 0) { + fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code)); + uvTask->errCode = code; + } taosMemoryFree(connect); QUEUE_REMOVE(&uvTask->procTaskQueue); uv_sem_post(&uvTask->taskSem); @@ -1572,16 +1634,37 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT fnError("udfc create uv task, invalid task type : %d", task->type); } int32_t bufLen = encodeUdfRequest(NULL, &request); + if (bufLen <= 0) { + fnError("udfc create uv task, encode request failed. size: %d", bufLen); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } request.msgLen = bufLen; void *bufBegin = taosMemoryMalloc(bufLen); + if(bufBegin == NULL) { + fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen); + return TSDB_CODE_OUT_OF_MEMORY; + } void *buf = bufBegin; - encodeUdfRequest(&buf, &request); + if(encodeUdfRequest(&buf, &request) <= 0) + { + fnError("udfc create uv task, encode request failed. size: %d", bufLen); + taosMemoryFree(bufBegin); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } + uvTask->reqBuf = uv_buf_init(bufBegin, bufLen); uvTask->seqNum = request.seqNum; } else if (uvTaskType == UV_TASK_DISCONNECT) { uvTask->pipe = task->session->udfUvPipe; } - uv_sem_init(&uvTask->taskSem, 0); + if (uv_sem_init(&uvTask->taskSem, 0) != 0) + { + if (uvTaskType == UV_TASK_REQ_RSP) { + taosMemoryFree(uvTask->reqBuf.base); + } + fnError("udfc create uv task, init semaphore failed."); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } return 0; } @@ -1592,7 +1675,11 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { uv_mutex_lock(&udfc->taskQueueMutex); QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue); uv_mutex_unlock(&udfc->taskQueueMutex); - uv_async_send(&udfc->loopTaskAync); + int32_t code = uv_async_send(&udfc->loopTaskAync); + if (code != 0) { + fnError("udfc queue uv task to event loop failed. code: %s", uv_strerror(code)); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } uv_sem_wait(&uvTask->taskSem); fnInfo("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask); @@ -1608,10 +1695,23 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); - uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0); + if(pipe == NULL) { + fnError("udfc event loop start connect task malloc pipe failed."); + return TSDB_CODE_OUT_OF_MEMORY; + } + if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) { + fnError("udfc event loop start connect task uv_pipe_init failed."); + taosMemoryFree(pipe); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } uvTask->pipe = pipe; SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn)); + if(conn == NULL) { + fnError("udfc event loop start connect task malloc conn failed."); + taosMemoryFree(pipe); + return TSDB_CODE_OUT_OF_MEMORY; + } conn->pipe = pipe; conn->readBuf.len = 0; conn->readBuf.cap = 0; @@ -1622,6 +1722,12 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { pipe->data = conn; uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); + if(connReq == NULL) { + fnError("udfc event loop start connect task malloc connReq failed."); + taosMemoryFree(pipe); + taosMemoryFree(conn); + return TSDB_CODE_OUT_OF_MEMORY; + } connReq->data = uvTask; uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect); code = 0; @@ -1633,6 +1739,10 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { code = TSDB_CODE_UDF_PIPE_NOT_EXIST; } else { uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); + if(write == NULL) { + fnError("udfc event loop start req_rsp task malloc write failed."); + return TSDB_CODE_OUT_OF_MEMORY; + } write->data = pipe->data; QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue; QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue); @@ -1726,27 +1836,41 @@ void udfStopAsyncCb(uv_async_t *async) { } void constructUdfService(void *argsThread) { + int32_t code = 0, lino = 0; SUdfcProxy *udfc = (SUdfcProxy *)argsThread; - uv_loop_init(&udfc->uvLoop); + code = uv_loop_init(&udfc->uvLoop); + TAOS_CHECK_GOTO(code, &lino, _exit); - uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb); + code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb); + TAOS_CHECK_GOTO(code, &lino, _exit); udfc->loopTaskAync.data = udfc; - uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb); + code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb); + TAOS_CHECK_GOTO(code, &lino, _exit); udfc->loopStopAsync.data = udfc; - uv_mutex_init(&udfc->taskQueueMutex); + code = uv_mutex_init(&udfc->taskQueueMutex); + TAOS_CHECK_GOTO(code, &lino, _exit); QUEUE_INIT(&udfc->taskQueue); QUEUE_INIT(&udfc->uvProcTaskQueue); - uv_barrier_wait(&udfc->initBarrier); + (void)uv_barrier_wait(&udfc->initBarrier); // TODO return value of uv_run - uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); - uv_loop_close(&udfc->uvLoop); + int num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); + fnInfo("udfc uv loop exit. active handle num: %d", num); + (void)uv_loop_close(&udfc->uvLoop); uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL); - uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); - uv_loop_close(&udfc->uvLoop); + num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); + fnInfo("udfc uv loop exit. active handle num: %d", num); + + (void)uv_loop_close(&udfc->uvLoop); +_exit: + if (code != 0) { + fnError("udfc construct error. code: %d, line: %d", code, lino); + } + fnInfo("udfc construct finished"); } int32_t udfcOpen() { + int32_t code = 0, lino = 0; int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1); if (old == 1) { return 0; @@ -1754,16 +1878,36 @@ int32_t udfcOpen() { SUdfcProxy *proxy = &gUdfcProxy; getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName)); proxy->udfcState = UDFC_STATE_STARTNG; - uv_barrier_init(&proxy->initBarrier, 2); - uv_thread_create(&proxy->loopThread, constructUdfService, proxy); + code = uv_barrier_init(&proxy->initBarrier, 2); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy); + TAOS_CHECK_GOTO(code, &lino, _exit); atomic_store_8(&proxy->udfcState, UDFC_STATE_READY); proxy->udfcState = UDFC_STATE_READY; - uv_barrier_wait(&proxy->initBarrier); - uv_mutex_init(&proxy->udfStubsMutex); + (void)uv_barrier_wait(&proxy->initBarrier); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = uv_mutex_init(&proxy->udfStubsMutex); + TAOS_CHECK_GOTO(code, &lino, _exit); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); + if(proxy->udfStubs == NULL) { + fnError("udfc init failed. udfStubs: %p", proxy->udfStubs); + return -1; + } proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); - uv_mutex_init(&proxy->udfcUvMutex); - fnInfo("udfc initialized") return 0; + if(proxy->expiredUdfStubs == NULL) { + taosArrayDestroy(proxy->udfStubs); + fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs); + return -1; + } + code = uv_mutex_init(&proxy->udfcUvMutex); + TAOS_CHECK_GOTO(code, &lino, _exit); +_exit: + if (code != 0) { + fnError("udfc open error. code: %d, line: %d", code, lino); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } + fnInfo("udfc initialized"); + return 0; } int32_t udfcClose() { @@ -1774,8 +1918,12 @@ int32_t udfcClose() { SUdfcProxy *udfc = &gUdfcProxy; udfc->udfcState = UDFC_STATE_STOPPING; - uv_async_send(&udfc->loopStopAsync); - uv_thread_join(&udfc->loopThread); + if(uv_async_send(&udfc->loopStopAsync) != 0) { + fnError("udfc close error to send stop async"); + } + if(uv_thread_join(&udfc->loopThread) != 0 ) { + fnError("udfc close errir to join loop thread"); + } uv_mutex_destroy(&udfc->taskQueueMutex); uv_barrier_destroy(&udfc->initBarrier); taosArrayDestroy(udfc->expiredUdfStubs); @@ -1788,45 +1936,61 @@ int32_t udfcClose() { } int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { + int32_t code = 0, lino = 0; SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); + if(uvTask == NULL) { + fnError("udfc client task: %p failed to allocate memory for uvTask", task); + return TSDB_CODE_OUT_OF_MEMORY; + } fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe); - udfcInitializeUvTask(task, uvTaskType, uvTask); - udfcQueueUvTask(uvTask); - udfcGetUdfTaskResultFromUvTask(task, uvTask); + code = udfcInitializeUvTask(task, uvTaskType, uvTask); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = udfcQueueUvTask(uvTask); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = udfcGetUdfTaskResultFromUvTask(task, uvTask); + TAOS_CHECK_GOTO(code, &lino, _exit); if (uvTaskType == UV_TASK_CONNECT) { task->session->udfUvPipe = uvTask->pipe; SClientUvConn *conn = uvTask->pipe->data; conn->session = task->session; } + +_exit: + if (code != 0) { + fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino); + } taosMemoryFree(uvTask->reqBuf.base); uvTask->reqBuf.base = NULL; taosMemoryFree(uvTask); - fnDebug("udfc freed uvTask: %p", task); - uvTask = NULL; - return task->errCode; + return code; } int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { + int32_t code = TSDB_CODE_SUCCESS, lino = 0; SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - task->errCode = 0; + if(task == NULL) { + fnError("doSetupUdf, failed to allocate memory for task"); + return TSDB_CODE_OUT_OF_MEMORY; + } task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); + if(task->session == NULL) { + fnError("doSetupUdf, failed to allocate memory for session"); + taosMemoryFree(task); + return TSDB_CODE_OUT_OF_MEMORY; + } task->session->udfc = &gUdfcProxy; task->type = UDF_TASK_SETUP; SUdfSetupRequest *req = &task->_setup.req; strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); - int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT); - if (errCode != 0) { - fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfcProxy)->udfdPipeName); - taosMemoryFree(task->session); - taosMemoryFree(task); - return TSDB_CODE_UDF_PIPE_CONNECT_ERR; - } + code = udfcRunUdfUvTask(task, UV_TASK_CONNECT); + TAOS_CHECK_GOTO(code, &lino, _exit); - udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + TAOS_CHECK_GOTO(code, &lino, _exit); SUdfSetupResponse *rsp = &task->_setup.rsp; task->session->severHandle = rsp->udfHandle; @@ -1834,15 +1998,18 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { task->session->bytes = rsp->bytes; task->session->bufSize = rsp->bufSize; strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN); - if (task->errCode != 0) { - fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) - } else { - fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session); - *funcHandle = task->session; - } - int32_t err = task->errCode; + fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session); + *funcHandle = task->session; taosMemoryFree(task); - return err; + return 0; + +_exit: + if (code != 0) { + fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino); + } + taosMemoryFree(task->session); + taosMemoryFree(task); + return code; } int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, @@ -1854,7 +2021,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf return TSDB_CODE_UDF_PIPE_NOT_EXIST; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - task->errCode = 0; + if(task == NULL) { + fnError("udfc call udf. failed to allocate memory for task"); + return TSDB_CODE_OUT_OF_MEMORY; + } task->session = (SUdfcUvSession *)handle; task->type = UDF_TASK_CALL; @@ -1887,10 +2057,9 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf } } - udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); - - if (task->errCode != 0) { - fnError("call udf failure. err: %d", task->errCode); + int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + if (code != 0) { + fnError("call udf failure. udfcRunUdfUvTask err: %d", code); } else { SUdfCallResponse *rsp = &task->_call.rsp; switch (callType) { @@ -1916,9 +2085,8 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf } } }; - int err = task->errCode; taosMemoryFree(task); - return err; + return code; } int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { @@ -1957,11 +2125,15 @@ int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdf int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; SSDataBlock inputBlock = {0}; - convertScalarParamToDataBlock(input, numOfCols, &inputBlock); + int32_t code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock); + if(code != 0) { + fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code); + return code; + } SSDataBlock resultBlock = {0}; int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); if (err == 0) { - convertDataBlockToScalarParm(&resultBlock, output); + err = convertDataBlockToScalarParm(&resultBlock, output); taosArrayDestroy(resultBlock.pDataBlock); } @@ -1970,6 +2142,7 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t } int32_t doTeardownUdf(UdfcFuncHandle handle) { + int32_t code = TSDB_CODE_SUCCESS, lino = 0;; SUdfcUvSession *session = (SUdfcUvSession *)handle; if (session->udfUvPipe == NULL) { @@ -1979,18 +2152,22 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - task->errCode = 0; + if(task == NULL) { + fnError("doTeardownUdf, failed to allocate memory for task"); + taosMemoryFree(session); + return TSDB_CODE_OUT_OF_MEMORY; + } task->session = session; task->type = UDF_TASK_TEARDOWN; SUdfTeardownRequest *req = &task->_teardown.req; req->udfHandle = task->session->severHandle; - udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + TAOS_CHECK_GOTO(code, &lino, _exit); - int32_t err = task->errCode; - - udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); + code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); + TAOS_CHECK_GOTO(code, &lino, _exit); fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); // TODO: synchronization refactor between libuv event loop and request thread @@ -2000,8 +2177,13 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { conn->session = NULL; } uv_mutex_unlock(&gUdfcProxy.udfcUvMutex); + +_exit: + if (code != 0) { + fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino); + } taosMemoryFree(session); taosMemoryFree(task); - return err; + return code; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 75bed73bb3..7339f115a3 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -54,38 +54,39 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } int32_t udfdCPluginClose() { return 0; } -const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { +int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { char initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *initSuffix = "_init"; snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix); - uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)); + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc))); char destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0}; char *destroySuffix = "_destroy"; snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix); - uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)); - return udfName; + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc))); + return 0; } -void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { +int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); - uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); + snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc))); char startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char *startSuffix = "_start"; snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix); - uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)); + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc))); char finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0}; char *finishSuffix = "_finish"; snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix); - uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc))); char mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char *mergeSuffix = "_merge"; snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix); - uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); + (void)(uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc))); + return 0; } int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { @@ -99,27 +100,43 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { } const char *udfName = udf->name; - udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); + err = udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); + if (err != 0) { + fnError("can not load init/destroy functions. error: %d", err); + err = TSDB_CODE_UDF_LOAD_UDF_FAILURE; + goto _exit; + } if (udf->funcType == UDF_FUNC_TYPE_SCALAR) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); - uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); + if (uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)) != 0) { + fnError("can not load library function %s. error: %s", processFuncName, uv_strerror(err)); + err = TSDB_CODE_UDF_LOAD_UDF_FAILURE; + goto _exit; + } } else if (udf->funcType == UDF_FUNC_TYPE_AGG) { - udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName); + err = udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName); + if (err != 0) { + fnError("can not load aggregation functions. error: %d", err); + err = TSDB_CODE_UDF_LOAD_UDF_FAILURE; + goto _exit; + } } - int32_t code = 0; if (udfCtx->initFunc) { - code = (udfCtx->initFunc)(); - if (code != 0) { - uv_dlclose(&udfCtx->lib); - taosMemoryFree(udfCtx); - return code; + err = (udfCtx->initFunc)(); + if (err != 0) { + fnError("udf init function failed. error: %d", err); + goto _exit; } } *pUdfCtx = udfCtx; return 0; +_exit: + uv_dlclose(&udfCtx->lib); + taosMemoryFree(udfCtx); + return err; } int32_t udfdCPluginUdfDestroy(void *udfCtx) { @@ -303,7 +320,7 @@ static int32_t udfdConnectToMnode(); static bool udfdRpcRfp(int32_t code, tmsg_t msgType); static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t udfdOpenClientRpc(); -static int32_t udfdCloseClientRpc(); +static void udfdCloseClientRpc(); static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request); static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request); @@ -320,7 +337,7 @@ static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf static void udfdOnNewConnection(uv_stream_t *server, int status); static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); -static int32_t removeListeningPipe(); +static void removeListeningPipe(); static void udfdPrintVersion(); static int32_t udfdParseArgs(int32_t argc, char *argv[]); @@ -330,13 +347,13 @@ static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); static int32_t udfdUvInit(); static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); -static int32_t udfdRun(); +static void udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); -SUdf *udfdNewUdf(const char *udfName); +int32_t udfdNewUdf(SUdf **pUdf, const char *udfName); void udfdGetFuncBodyPath(const SUdf *udf, char *path); -void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { +int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; @@ -349,8 +366,9 @@ void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}}; - plugin->openFunc(items, 1); - return; + int32_t err = plugin->openFunc(items, 1); + if (err != 0) return err; + return 0; } int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) { @@ -412,7 +430,9 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { if (plugin->closeFunc) { - plugin->closeFunc(); + if (plugin->closeFunc() != 0) { + fnError("udf script c plugin close func failed.line:%d", __LINE__); + } } plugin->openFunc = NULL; plugin->closeFunc = NULL; @@ -428,7 +448,9 @@ void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { if (plugin->closeFunc) { - plugin->closeFunc(); + if(plugin->closeFunc() != 0) { + fnError("udf script python plugin close func failed.line:%d", __LINE__); + } } uv_dlclose(&plugin->lib); if (plugin->libLoaded) { @@ -447,14 +469,23 @@ void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { int32_t udfdInitScriptPlugin(int8_t scriptType) { SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); - + if (plugin == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t err = 0; switch (scriptType) { case TSDB_FUNC_SCRIPT_BIN_LIB: - udfdInitializeCPlugin(plugin); + err = udfdInitializeCPlugin(plugin); + if (err != 0) { + fnError("udf script c plugin init failed. error: %d", err); + taosMemoryFree(plugin); + return err; + } break; case TSDB_FUNC_SCRIPT_PYTHON: { - int32_t err = udfdInitializePythonPlugin(plugin); + err = udfdInitializePythonPlugin(plugin); if (err != 0) { + fnError("udf script python plugin init failed. error: %d", err); taosMemoryFree(plugin); return err; } @@ -489,7 +520,7 @@ void udfdDeinitScriptPlugins() { void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUdfRequest request = {0}; - decodeUdfRequest(uvUdf->input.base, &request); + if(decodeUdfRequest(uvUdf->input.base, &request) == NULL) return; switch (request.type) { case UDF_TASK_SETUP: { @@ -544,6 +575,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { if (scriptPlugin == NULL) { err = udfdInitScriptPlugin(udf->scriptType); if (err != 0) { + fnError("udf name %s init script plugin failed. error %d", udfName, err); uv_mutex_unlock(&global.scriptPluginsMutex); return err; } @@ -563,15 +595,15 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { return 0; } -SUdf *udfdNewUdf(const char *udfName) { +int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); udfNew->refCount = 1; udfNew->lastFetchTime = taosGetTimestampMs(); strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); udfNew->state = UDF_STATE_INIT; - uv_mutex_init(&udfNew->lock); - uv_cond_init(&udfNew->condReady); + if (uv_mutex_init(&udfNew->lock) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE; + if (uv_cond_init(&udfNew->condReady) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE; udfNew->resident = false; udfNew->expired = false; @@ -582,10 +614,28 @@ SUdf *udfdNewUdf(const char *udfName) { break; } } - return udfNew; + *pUdf = udfNew; + return 0; } -SUdf *udfdGetOrCreateUdf(const char *udfName) { +void udfdFreeUdf(void *pData) { + SUdf *pSudf = (SUdf *)pData; + if (pSudf == NULL) { + return; + } + + if (pSudf->scriptPlugin != NULL) { + if(pSudf->scriptPlugin->udfDestroyFunc(pSudf->scriptUdfCtx) != 0) { + fnError("udfdFreeUdf: udfd destroy udf %s failed", pSudf->name); + } + } + + uv_mutex_destroy(&pSudf->lock); + uv_cond_destroy(&pSudf->condReady); + taosMemoryFree(pSudf); +} + +int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) { uv_mutex_lock(&global.udfsMutex); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); int64_t currTime = taosGetTimestampMs(); @@ -594,26 +644,34 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s if (!expired) { ++(*pUdfHash)->refCount; - SUdf *udf = *pUdfHash; + *ppUdf = *pUdfHash; uv_mutex_unlock(&global.udfsMutex); - fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, udf->name, udf->version, - udf->createdTime); - return udf; + fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, (*ppUdf)->name, (*ppUdf)->version, + (*ppUdf)->createdTime); + return 0; } else { (*pUdfHash)->expired = true; fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64, (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime); - taosHashRemove(global.udfsHash, udfName, strlen(udfName)); + if(taosHashRemove(global.udfsHash, udfName, strlen(udfName)) != 0) { + fnError("udfdGetOrCreateUdf: udfd remove udf %s failed", udfName); + } } } - SUdf *udf = udfdNewUdf(udfName); + int32_t code = udfdNewUdf(ppUdf, udfName); + if(code != 0) { + uv_mutex_unlock(&global.udfsMutex); + return code; + } - SUdf **pUdf = &udf; - taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); + if ((code = taosHashPut(global.udfsHash, udfName, strlen(udfName), ppUdf, POINTER_BYTES)) != 0) { + uv_mutex_unlock(&global.udfsMutex); + return code; + } uv_mutex_unlock(&global.udfsMutex); - return udf; + return 0; } void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { @@ -622,10 +680,13 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfSetupRequest *setup = &request->setup; int32_t code = TSDB_CODE_SUCCESS; - SUdf *udf = NULL; - - udf = udfdGetOrCreateUdf(setup->udfName); + SUdf *udf = NULL; + code = udfdGetOrCreateUdf(&udf, setup->udfName); + if(code != 0) { + fnError("udfdGetOrCreateUdf failed. udf name %s", setup->udfName); + goto _send; + } uv_mutex_lock(&udf->lock); if (udf->state == UDF_STATE_INIT) { udf->state = UDF_STATE_LOADING; @@ -646,6 +707,8 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); handle->udf = udf; +_send: + ; SUdfResponse rsp; rsp.seqNum = request->seqNum; rsp.type = request->type; @@ -656,11 +719,23 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { rsp.setupRsp.bufSize = udf->bufSize; int32_t len = encodeUdfResponse(NULL, &rsp); + if(len < 0) { + fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len); + return; + } rsp.msgLen = len; void *bufBegin = taosMemoryMalloc(len); + if(bufBegin == NULL) { + fnError("udfdProcessSetupRequest: malloc failed. len %d", len); + return; + } void *buf = bufBegin; - encodeUdfResponse(&buf, &rsp); - + if(encodeUdfResponse(&buf, &rsp) < 0) { + fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len); + taosMemoryFree(bufBegin); + return; + } + uvUdf->output = uv_buf_init(bufBegin, len); taosMemoryFree(uvUdf->input.base); @@ -685,30 +760,35 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { output.colMeta.type = udf->outputType; output.colMeta.precision = 0; output.colMeta.scale = 0; - udfColEnsureCapacity(&output, call->block.info.rows); - - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); - freeUdfDataDataBlock(&input); - if(code == 0) convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + if (udfColEnsureCapacity(&output, call->block.info.rows) == TSDB_CODE_SUCCESS) { + SUdfDataBlock input = {0}; + code = convertDataBlockToUdfDataBlock(&call->block, &input); + if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); + freeUdfDataDataBlock(&input); + if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + } freeUdfColumn(&output); break; } case TSDB_UDF_CALL_AGG_INIT: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); + if (outBuf.buf != NULL) { + code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } subRsp->resultBuf = outBuf; break; } case TSDB_UDF_CALL_AGG_PROC: { SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); - freeUdfInterBuf(&call->interBuf); + if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); + freeUdfInterBuf(&call->interBuf); + subRsp->resultBuf = outBuf; + } freeUdfDataDataBlock(&input); - subRsp->resultBuf = outBuf; break; } @@ -738,10 +818,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { subRsp->callType = call->callType; int32_t len = encodeUdfResponse(NULL, rsp); + if(len < 0) { + fnError("udfdProcessCallRequest: encode udf response failed. len %d", len); + return; + } rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); + if(encodeUdfResponse(&buf, rsp) < 0) { + fnError("udfdProcessCallRequest: encode udf response failed. len %d", len); + taosMemoryFree(bufBegin); + return; + } + uvUdf->output = uv_buf_init(bufBegin, len); switch (call->callType) { @@ -787,7 +876,11 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { udf->refCount--; if (udf->refCount == 0 && (!udf->resident || udf->expired)) { unloadUdf = true; - taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); + code = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); + if (code != 0) { + fnError("udf name %s remove from hash failed", udf->name); + goto _send; + } } uv_mutex_unlock(&global.udfsMutex); if (unloadUdf) { @@ -798,18 +891,27 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { fnDebug("udfd destroy function returns %d", code); taosMemoryFree(udf); } - taosMemoryFree(handle); +_send: + taosMemoryFree(handle); SUdfResponse response = {0}; SUdfResponse *rsp = &response; rsp->seqNum = request->seqNum; rsp->type = request->type; rsp->code = code; int32_t len = encodeUdfResponse(NULL, rsp); + if (len < 0) { + fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len); + return; + } rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); + if (encodeUdfResponse(&buf, rsp) < 0) { + fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len); + taosMemoryFree(bufBegin); + return; + } uvUdf->output = uv_buf_init(bufBegin, len); taosMemoryFree(uvUdf->input.base); @@ -865,7 +967,10 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { fnError("udfd write udf shared library failed"); return TSDB_CODE_FILE_CORRUPTED; } - taosCloseFile(&file); + if(taosCloseFile(&file) != 0) { + fnError("udfdSaveFuncBodyToFile, udfd close file failed"); + return TSDB_CODE_FILE_CORRUPTED; + } strncpy(udf->path, path, PATH_MAX); return TSDB_CODE_SUCCESS; @@ -888,7 +993,10 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { SConnectRsp connectRsp = {0}; - tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); + if(tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp) < 0){ + fnError("udfd deserialize connect response failed"); + goto _return; + } int32_t now = taosGetTimestampSec(); int32_t delta = abs(now - connectRsp.svrTimestamp); @@ -908,7 +1016,10 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { msgInfo->code = 0; } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { SRetrieveFuncRsp retrieveRsp = {0}; - tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); + if(tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp) < 0){ + fnError("udfd deserialize retrieve func response failed"); + goto _return; + } SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SUdf *udf = msgInfo->param; @@ -940,28 +1051,43 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, udfName); + if(taosArrayPush(retrieveReq.pFuncNames, udfName) == NULL) { + taosArrayDestroy(retrieveReq.pFuncNames); + return terrno; + } int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); + if(contLen < 0) { + taosArrayDestroy(retrieveReq.pFuncNames); + return terrno; + } void *pReq = rpcMallocCont(contLen); - tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); + if(tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq) < 0) { + taosArrayDestroy(retrieveReq.pFuncNames); + rpcFreeCont(pReq); + return terrno; + } taosArrayDestroy(retrieveReq.pFuncNames); SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; msgInfo->param = udf; - uv_sem_init(&msgInfo->resultSem, 0); + if(uv_sem_init(&msgInfo->resultSem, 0) != 0) { + taosMemoryFree(msgInfo); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; rpcMsg.contLen = contLen; rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; rpcMsg.info.ahandle = msgInfo; - rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - uv_sem_destroy(&msgInfo->resultSem); - int32_t code = msgInfo->code; + int32_t code = rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + if (code == 0) { + uv_sem_wait(&msgInfo->resultSem); + uv_sem_destroy(&msgInfo->resultSem); + code = msgInfo->code; + } taosMemoryFree(msgInfo); return code; } @@ -1009,8 +1135,12 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe return -1; } - taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); - mgmtEpSet->numOfEps++; + int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); + if (code != TSDB_CODE_SUCCESS) { + fnError("invalid ep %s", secondEp); + } else { + mgmtEpSet->numOfEps++; + } } if (mgmtEpSet->numOfEps == 0) { @@ -1039,7 +1169,7 @@ int32_t udfdOpenClientRpc() { connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; - taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + TAOS_CHECK_RETURN(taosVersionStrToInt(version, &(rpcInit.compatibilityVer))); global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); @@ -1048,11 +1178,10 @@ int32_t udfdOpenClientRpc() { return 0; } -int32_t udfdCloseClientRpc() { +void udfdCloseClientRpc() { fnInfo("udfd begin closing rpc"); rpcClose(global.clientRpc); fnInfo("udfd finish closing rpc"); - return 0; } void udfdOnWrite(uv_write_t *req, int status) { @@ -1082,7 +1211,11 @@ void udfdSendResponse(uv_work_t *work, int status) { if (udfWork->conn != NULL) { uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); write_req->data = udfWork; - uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); + int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); + if (code != 0) { + fnError("udfd send response error %s", uv_strerror(code)); + taosMemoryFree(write_req); + } } taosMemoryFree(work); } @@ -1146,7 +1279,12 @@ void udfdHandleRequest(SUdfdUvConn *conn) { conn->inputCap = 0; conn->inputTotal = -1; work->data = udfWork; - uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse); + if(uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse) != 0) + { + fnError("udfd queue work failed"); + taosMemoryFree(work); + taosMemoryFree(udfWork); + } } void udfdPipeCloseCb(uv_handle_t *pipe) { @@ -1193,9 +1331,15 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { fnError("udfd new connection error. code: %s", uv_strerror(status)); return; } + int32_t code = 0; uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t)); - uv_pipe_init(global.loop, client, 0); + code = uv_pipe_init(global.loop, client, 0); + if (code) { + fnError("udfd pipe init error %s", uv_strerror(code)); + taosMemoryFree(client); + return; + } if (uv_accept(server, (uv_stream_t *)client) == 0) { SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); ctx->pWorkList = NULL; @@ -1205,7 +1349,13 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { ctx->inputCap = 0; client->data = ctx; ctx->client = (uv_stream_t *)client; - uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead); + code = uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead); + if (code) { + fnError("udfd read start error %s", uv_strerror(code)); + udfdUvHandleError(ctx); + taosMemoryFree(ctx); + taosMemoryFree(client); + } } else { uv_close((uv_handle_t *)client, NULL); } @@ -1214,8 +1364,14 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { fnInfo("udfd signal received: %d\n", signum); uv_fs_t req; - uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); - uv_signal_stop(handle); + int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + if(code) { + fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(code), __LINE__); + } + code = uv_signal_stop(handle); + if(code) { + fnError("stop signal handler failed, reason:%s", uv_strerror(code)); + } uv_stop(global.loop); } @@ -1224,12 +1380,12 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) { if (strcmp(argv[i], "-c") == 0) { if (i < argc - 1) { if (strlen(argv[++i]) >= PATH_MAX) { - printf("config file path overflow"); + (void)printf("config file path overflow"); return -1; } tstrncpy(configDir, argv[i], PATH_MAX); } else { - printf("'-c' requires a parameter, default is %s\n", configDir); + (void)printf("'-c' requires a parameter, default is %s\n", configDir); return -1; } } else if (strcmp(argv[i], "-V") == 0) { @@ -1242,9 +1398,9 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) { } static void udfdPrintVersion() { - printf("udfd version: %s compatible_version: %s\n", version, compatible_version); - printf("git: %s\n", gitinfo); - printf("build: %s\n", buildinfo); + (void)printf("udfd version: %s compatible_version: %s\n", version, compatible_version); + (void)printf("git: %s\n", gitinfo); + (void)printf("build: %s\n", buildinfo); } static int32_t udfdInitLog() { @@ -1270,35 +1426,31 @@ void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { taosMemoryFree(buf->base); } -static int32_t removeListeningPipe() { +static void removeListeningPipe() { uv_fs_t req; int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); uv_fs_req_cleanup(&req); - return err; + if(err) { + fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__); + } } static int32_t udfdUvInit() { - uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); - if (loop) { - uv_loop_init(loop); - } else { - return -1; - } - global.loop = loop; + TAOS_CHECK_RETURN(uv_loop_init(global.loop)); if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit - uv_pipe_init(global.loop, &global.ctrlPipe, 1); - uv_pipe_open(&global.ctrlPipe, 0); - uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); + TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1)); + TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0)); + TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb)); } getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName)); removeListeningPipe(); - uv_pipe_init(global.loop, &global.listeningPipe, 0); + TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.listeningPipe, 0)); - uv_signal_init(global.loop, &global.intrSignal); - uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT); + TAOS_CHECK_RETURN(uv_signal_init(global.loop, &global.intrSignal)); + TAOS_CHECK_RETURN(uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT)); int r; fnInfo("bind to pipe %s", global.listenPipeName); @@ -1321,25 +1473,59 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) { } } -static int32_t udfdRun() { - uv_mutex_init(&global.scriptPluginsMutex); +static int32_t udfdGlobalDataInit() { + uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); + if (loop == NULL) { + fnError("udfd init uv loop failed, mem overflow"); + return -1; + } + global.loop = loop; + + if (uv_mutex_init(&global.scriptPluginsMutex) != 0) { + fnError("udfd init script plugins mutex failed"); + return -1; + } global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - uv_mutex_init(&global.udfsMutex); + if (global.udfsHash == NULL) { + return terrno; + } + // taosHashSetFreeFp(global.udfsHash, udfdFreeUdf); - fnInfo("start udfd event loop"); - uv_run(global.loop, UV_RUN_DEFAULT); - fnInfo("udfd event loop stopped."); - - uv_loop_close(global.loop); - - uv_walk(global.loop, udfdCloseWalkCb, NULL); - uv_run(global.loop, UV_RUN_DEFAULT); - uv_loop_close(global.loop); + if (uv_mutex_init(&global.udfsMutex) != 0) { + fnError("udfd init udfs mutex failed"); + return -2; + } return 0; } +static void udfdGlobalDataDeinit() { + taosHashCleanup(global.udfsHash); + uv_mutex_destroy(&global.udfsMutex); + uv_mutex_destroy(&global.scriptPluginsMutex); + taosMemoryFree(global.loop); + fnInfo("udfd global data deinit"); +} + +static void udfdRun() { + fnInfo("start udfd event loop"); + int32_t code = uv_run(global.loop, UV_RUN_DEFAULT); + if(code != 0) { + fnError("udfd event loop still has active handles or requests."); + } + fnInfo("udfd event loop stopped."); + + (void)uv_loop_close(global.loop); + + uv_walk(global.loop, udfdCloseWalkCb, NULL); + code = uv_run(global.loop, UV_RUN_DEFAULT); + if(code != 0) { + fnError("udfd event loop still has active handles or requests."); + } + (void)uv_loop_close(global.loop); +} + int32_t udfdInitResidentFuncs() { if (strlen(tsUdfdResFuncs) == 0) { return TSDB_CODE_SUCCESS; @@ -1352,13 +1538,17 @@ int32_t udfdInitResidentFuncs() { char func[TSDB_FUNC_NAME_LEN + 1] = {0}; strncpy(func, token, TSDB_FUNC_NAME_LEN); fnInfo("udfd add resident function %s", func); - taosArrayPush(global.residentFuncs, func); + if(taosArrayPush(global.residentFuncs, func) == NULL) + { + taosArrayDestroy(global.residentFuncs); + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; } -int32_t udfdDeinitResidentFuncs() { +void udfdDeinitResidentFuncs() { for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { char *funcName = taosArrayGet(global.residentFuncs, i); SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); @@ -1366,18 +1556,15 @@ int32_t udfdDeinitResidentFuncs() { SUdf *udf = *udfInHash; int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); fnDebug("udfd destroy function returns %d", code); - taosHashRemove(global.udfsHash, funcName, strlen(funcName)); + if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0) + { + fnError("udfd remove resident function %s failed", funcName); + } taosMemoryFree(udf); } } taosArrayDestroy(global.residentFuncs); - return TSDB_CODE_SUCCESS; -} - -int32_t udfdCleanup() { - uv_mutex_destroy(&global.udfsMutex); - taosHashCleanup(global.udfsHash); - return 0; + fnInfo("udfd resident functions are deinit"); } int32_t udfdCreateUdfSourceDir() { @@ -1392,20 +1579,27 @@ int32_t udfdCreateUdfSourceDir() { return code; } -int32_t udfdDestroyUdfSourceDir() { +void udfdDestroyUdfSourceDir() { fnInfo("destory udf source directory %s", global.udfDataDir); taosRemoveDir(global.udfDataDir); - return 0; } int main(int argc, char *argv[]) { + int code = 0; + bool logInitialized = false; + bool cfgInitialized = false; + bool openClientRpcFinished = false; + bool residentFuncsInited = false; + bool udfSourceDirInited = false; + bool globalDataInited = false; + if (!taosCheckSystemIsLittleEnd()) { - printf("failed to start since on non-little-end machines\n"); + (void)printf("failed to start since on non-little-end machines\n"); return -1; } if (udfdParseArgs(argc, argv) != 0) { - printf("failed to start since parse args error\n"); + (void)printf("failed to start since parse args error\n"); return -1; } @@ -1416,47 +1610,89 @@ int main(int argc, char *argv[]) { if (udfdInitLog() != 0) { // ignore create log failed, because this error no matter - printf("failed to start since init log error\n"); + (void)printf("failed to init udfd log."); + } else { + logInitialized = true; // log is initialized } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { fnError("failed to start since read config error"); - taosCloseLog(); - return -2; + code = -2; + goto _exit; } + cfgInitialized = true; // cfg is initialized + fnInfo("udfd start with config file %s", configDir); - initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); + if (initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp) != 0) { + fnError("init ep set from cfg failed"); + code = -3; + goto _exit; + } + fnInfo("udfd start with mnode ep %s", global.mgmtEp.epSet.eps[0].fqdn); if (udfdOpenClientRpc() != 0) { fnError("open rpc connection to mnode failed"); - taosCloseLog(); - return -3; + code = -4; + goto _exit; } + fnInfo("udfd rpc client is opened"); + openClientRpcFinished = true; // rpc is opened if (udfdCreateUdfSourceDir() != 0) { fnError("create udf source directory failed"); - taosCloseLog(); - return -4; + code = -5; + goto _exit; } + udfSourceDirInited = true; // udf source dir is created + fnInfo("udfd udf source directory is created"); + + if (udfdGlobalDataInit() != 0) { + fnError("init global data failed"); + code = -6; + goto _exit; + } + globalDataInited = true; // global data is inited + fnInfo("udfd global data is inited"); if (udfdUvInit() != 0) { fnError("uv init failure"); - taosCloseLog(); - return -5; + code = -7; + goto _exit; } + fnInfo("udfd uv is inited"); - udfdInitResidentFuncs(); + if (udfdInitResidentFuncs() != 0) { + fnError("init resident functions failed"); + code = -8; + goto _exit; + } + residentFuncsInited = true; // resident functions are inited + fnInfo("udfd resident functions are inited"); udfdRun(); + fnInfo("udfd exit normally"); removeListeningPipe(); - udfdDestroyUdfSourceDir(); - udfdCloseClientRpc(); - - udfdDeinitResidentFuncs(); - udfdDeinitScriptPlugins(); - taosCloseLog(); - udfdCleanup(); - return 0; +_exit: + if (globalDataInited) { + udfdGlobalDataDeinit(); + } + if (residentFuncsInited) { + udfdDeinitResidentFuncs(); + } + if (udfSourceDirInited) { + udfdDestroyUdfSourceDir(); + } + if (openClientRpcFinished) { + udfdCloseClientRpc(); + } + if (cfgInitialized) { + taosCleanupCfg(); + } + if (logInitialized) { + taosCloseLog(); + } + + return code; } diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index aa8b88b738..346cb468b4 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -9,17 +9,20 @@ #include "tglobal.h" #include "tudf.h" +#define TAOSFPRINTF(stream, format, ...) ((void)fprintf(stream, format, ##__VA_ARGS__)) +#define TAOSPRINTF(format, ...) ((void)printf(format, ##__VA_ARGS__)) + static int32_t parseArgs(int32_t argc, char *argv[]) { for (int32_t i = 1; i < argc; ++i) { if (strcmp(argv[i], "-c") == 0) { if (i < argc - 1) { if (strlen(argv[++i]) >= PATH_MAX) { - printf("config file path overflow"); + TAOSPRINTF("config file path overflow"); return -1; } tstrncpy(configDir, argv[i], PATH_MAX); } else { - printf("'-c' requires a parameter, default is %s\n", configDir); + TAOSPRINTF("'-c' requires a parameter, default is %s\n", configDir); return -1; } } @@ -35,6 +38,7 @@ static int32_t initLog() { } int scalarFuncTest() { + int32_t ret = 0; UdfcFuncHandle handle; if (doSetupUdf("udf1", &handle) != 0) { @@ -47,10 +51,18 @@ int scalarFuncTest() { SSDataBlock *pBlock = █ for (int32_t i = 0; i < 1; ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); - blockDataAppendColInfo(pBlock, &colInfo); + ret = blockDataAppendColInfo(pBlock, &colInfo); + if (ret != 0) { + fnError("failed to append column info"); + return -1; + } } - blockDataEnsureCapacity(pBlock, 1024); + ret = blockDataEnsureCapacity(pBlock, 1024); + if (ret != 0) { + fnError("failed to ensure capacity"); + return -1; + } pBlock->info.rows = 1024; SColumnInfoData *pCol = taosArrayGet(pBlock->pDataBlock, 0); @@ -63,38 +75,56 @@ int scalarFuncTest() { input.columnData = taosArrayGet(pBlock->pDataBlock, 0); SScalarParam output = {0}; - doCallUdfScalarFunc(handle, &input, 1, &output); + ret = doCallUdfScalarFunc(handle, &input, 1, &output); + if (ret != 0) { + fnError("failed to call udf scalar function"); + return -1; + } taosArrayDestroy(pBlock->pDataBlock); SColumnInfoData *col = output.columnData; for (int32_t i = 0; i < output.numOfRows; ++i) { - if (i % 100 == 0) fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); + if (i % 100 == 0) TAOSFPRINTF(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); } colDataDestroy(output.columnData); taosMemoryFree(output.columnData); } int64_t end = taosGetTimestampUs(); - fprintf(stderr, "time: %f\n", (end - beg) / 1000.0); - doTeardownUdf(handle); + TAOSFPRINTF(stderr, "time: %f\n", (end - beg) / 1000.0); + ret = doTeardownUdf(handle); + if (ret != 0) { + fnError("failed to teardown udf"); + return -1; + } return 0; } int aggregateFuncTest() { + int32_t ret = 0; UdfcFuncHandle handle; - if (doSetupUdf("udf2", &handle) != 0) { - fnError("setup udf failure"); + ret = doSetupUdf("udf2", &handle); + if (ret != 0) { + fnError("setup udf failure, code:%d", ret); return -1; } SSDataBlock *pBlock = createDataBlock(); for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); - blockDataAppendColInfo(pBlock, &colInfo); + ret = blockDataAppendColInfo(pBlock, &colInfo); + if(ret != 0) { + fnError( "failed to append column info. code:%d", ret); + return -1; + } } - blockDataEnsureCapacity(pBlock, 1024); + ret = blockDataEnsureCapacity(pBlock, 1024); + if (ret != 0) { + fnError( "failed to ensure capacity. code:%d", ret); + return -1; + } pBlock->info.rows = 1024; SColumnInfoData *pColInfo = bdGetColumnInfoData(pBlock, 0); @@ -105,37 +135,77 @@ int aggregateFuncTest() { SUdfInterBuf buf = {0}; SUdfInterBuf newBuf = {0}; SUdfInterBuf resultBuf = {0}; - doCallUdfAggInit(handle, &buf); - doCallUdfAggProcess(handle, pBlock, &buf, &newBuf); + ret = doCallUdfAggInit(handle, &buf); + if (ret != 0) { + fnError("failed to init udf. code:%d", ret); + return -1; + } + ret = doCallUdfAggProcess(handle, pBlock, &buf, &newBuf); + if (ret != 0) { + fnError("failed to process udf. code:%d", ret); + return -1; + } taosArrayDestroy(pBlock->pDataBlock); - doCallUdfAggFinalize(handle, &newBuf, &resultBuf); + ret = doCallUdfAggFinalize(handle, &newBuf, &resultBuf); + if (ret != 0) { + TAOSFPRINTF(stderr,"failed to finalize udf. code:%d", ret); + return -1; + } if (resultBuf.buf != NULL) { - fprintf(stderr, "agg result: %f\n", *(double *)resultBuf.buf); + TAOSFPRINTF(stderr, "agg result: %f\n", *(double *)resultBuf.buf); } else { - fprintf(stderr, "result buffer is null"); + fnError("result buffer is null"); } freeUdfInterBuf(&buf); freeUdfInterBuf(&newBuf); freeUdfInterBuf(&resultBuf); - doTeardownUdf(handle); + ret = doTeardownUdf(handle); + if (ret != 0) { + fnError("failed to teardown udf. code:%d", ret); + return -1; + } blockDataDestroy(pBlock); return 0; } int main(int argc, char *argv[]) { - parseArgs(argc, argv); - initLog(); + int32_t ret = 0; + ret = parseArgs(argc, argv); + if (ret != 0) { + fnError("failed to parse args"); + return -1; + } + ret = initLog(); + if (ret != 0) { + fnError("failed to init log"); + return -1; + } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { fnError("failed to start since read config error"); return -1; } - udfcOpen(); + if (udfcOpen() != 0) { + fnError("failed to open udfc"); + return -1; + } uv_sleep(1000); - scalarFuncTest(); - aggregateFuncTest(); - udfcClose(); + ret = scalarFuncTest(); + if (ret != 0) { + fnError("failed to run scalar function test"); + return -1; + } + ret = aggregateFuncTest(); + if (ret != 0) { + fnError("failed to run aggregate function test"); + return -1; + } + ret = udfcClose(); + if (ret != 0) { + fnError("failed to close udfc"); + return -1; + } } diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index 5b95087996..da30ede8bf 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -14,18 +14,25 @@ DLL_EXPORT int32_t udf1_init() { return 0; } DLL_EXPORT int32_t udf1_destroy() { return 0; } DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) { + int32_t code = 0; SUdfColumnData *resultData = &resultCol->colData; for (int32_t i = 0; i < block->numOfRows; ++i) { int j = 0; for (; j < block->numOfCols; ++j) { if (udfColDataIsNull(block->udfCols[j], i)) { - udfColDataSetNull(resultCol, i); + code = udfColDataSetNull(resultCol, i); + if (code != 0) { + return code; + } break; } } if (j == block->numOfCols) { int32_t luckyNum = 1; - udfColDataSet(resultCol, i, (char *)&luckyNum, false); + code = udfColDataSet(resultCol, i, (char *)&luckyNum, false); + if (code != 0) { + return code; + } } } // to simulate actual processing delay by udf diff --git a/source/libs/function/test/udf1_dup.c b/source/libs/function/test/udf1_dup.c index c251192da3..8e0af947b9 100644 --- a/source/libs/function/test/udf1_dup.c +++ b/source/libs/function/test/udf1_dup.c @@ -15,18 +15,25 @@ DLL_EXPORT int32_t udf1_dup_init() { return 0; } DLL_EXPORT int32_t udf1_dup_destroy() { return 0; } DLL_EXPORT int32_t udf1_dup(SUdfDataBlock *block, SUdfColumn *resultCol) { + int32_t code = 0; SUdfColumnData *resultData = &resultCol->colData; for (int32_t i = 0; i < block->numOfRows; ++i) { int j = 0; for (; j < block->numOfCols; ++j) { if (udfColDataIsNull(block->udfCols[j], i)) { - udfColDataSetNull(resultCol, i); + code = udfColDataSetNull(resultCol, i); + if (code != 0) { + return code; + } break; } } if (j == block->numOfCols) { int32_t luckyNum = 2; - udfColDataSet(resultCol, i, (char *)&luckyNum, false); + code = udfColDataSet(resultCol, i, (char *)&luckyNum, false); + if (code != 0) { + return code; + } } } // to simulate actual processing delay by udf diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 9bd60be2b6..b7880ba0cf 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -552,22 +552,22 @@ _end: #endif } -int32_t taosGetCpuCores(float *numOfCores, bool physical) { +void taosGetCpuCores(float *numOfCores, bool physical) { #ifdef WINDOWS SYSTEM_INFO info; GetSystemInfo(&info); *numOfCores = info.dwNumberOfProcessors; - return 0; + return; #elif defined(_TD_DARWIN_64) *numOfCores = sysconf(_SC_NPROCESSORS_ONLN); - return 0; + return; #else if (physical) { *numOfCores = sysconf(_SC_NPROCESSORS_ONLN); } else { taosCntrGetCpuCores(numOfCores); } - return 0; + return; #endif } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 3801ae9ffd..3f3bee4e35 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -723,6 +723,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_FUNC_EXEC_FAILURE, "udf function execution failure") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_UV_EXEC_FAILURE, "udf uvlib function execution failure") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") From ea4065d36b06b3094fb139428eb2fda465c9dc46 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 27 Jul 2024 20:09:29 +0800 Subject: [PATCH 3/5] fix possible error --- source/dnode/vnode/src/meta/metaQuery.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 6640f4aa07..ee616d7a0d 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1069,16 +1069,17 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) { if (count > TRY_ERROR_LIMIT) break; int32_t cmp = (*param->filterFunc)((void *)&p->btime, (void *)&pBtimeKey->btime, param->type); - if (cmp == 0) + if (cmp == 0) { if (taosArrayPush(pUids, &p->uid) == NULL) { ret = terrno; break; - } else { - if (param->equal == true) { - if (count > TRY_ERROR_LIMIT) break; - count++; - } } + } else { + if (param->equal == true) { + if (count > TRY_ERROR_LIMIT) break; + count++; + } + } valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); if (valid < 0) break; } while (1); From 49ef380e97277f6693343e19df24cbae9bc729d4 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 Date: Sat, 27 Jul 2024 22:40:36 +0800 Subject: [PATCH 4/5] fix tsma test for invalid paramters when creating tsma --- source/dnode/mnode/impl/src/mndStream.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 19ad9e3540..688b1eb9cb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -852,7 +852,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } SName name = {0}; - code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE); if (code) { goto _OVER; } @@ -3133,4 +3133,4 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, mndTransDrop(pTrans); return TSDB_CODE_ACTION_IN_PROGRESS; -} \ No newline at end of file +} From 1cbfed8a691ebfd9f8c691a03ea3450a48bb2ad9 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 Date: Sun, 28 Jul 2024 09:29:16 +0800 Subject: [PATCH 5/5] fix tsma py test --- tests/system-test/2-query/tsma.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index f2900634e8..29a7562b45 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -12,6 +12,7 @@ from util.common import * ROUND = 1000 ignore_some_tests: int = 1 +wait_query_seconds = 30 class TSMA: def __init__(self): @@ -755,13 +756,18 @@ class TDTestCase: time.sleep(1) time.sleep(1) - def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str, check_tsma_calculation : str=True): + def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str, check_tsma_calculation : str=True, expected_tsma_name: str = ''): tdSql.execute('use %s' % db) sql = "CREATE TSMA %s ON %s.%s FUNCTION(%s) INTERVAL(%s)" % ( tsma_name, db, tb, ','.join(func_list), interval) tdSql.execute(sql, queryTimes=1) + tsma_name_trim = tsma_name + if tsma_name[0] == '`': + tsma_name_trim = tsma_name[1:-1] + if expected_tsma_name != '': + tsma_name_trim = expected_tsma_name if check_tsma_calculation == True: - self.wait_for_tsma_calculation(func_list, db, tb, interval, tsma_name) + self.wait_for_tsma_calculation(func_list, db, tb, interval, tsma_name_trim) def create_error_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str, expectedErrno: int): tdSql.execute('use %s' % db) @@ -1276,7 +1282,7 @@ class TDTestCase: tdSql.error('drop tsma test.tsma1', -2147482491) tdSql.execute('drop tsma test.tsma2', queryTimes=1) tdSql.execute('drop tsma test.tsma1', queryTimes=1) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') + self.wait_query('show transactions', 0, wait_query_seconds, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1317,7 +1323,7 @@ class TDTestCase: 'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096) tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') + self.wait_query('show transactions', 0, wait_query_seconds, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database nsdb') # drop norm table @@ -1331,12 +1337,12 @@ class TDTestCase: self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') tdSql.execute('alter table test.t0 ttl 2', queryTimes=1) tdSql.execute('flush database test') - self.wait_query('show test.tables like "%t0"', 0, 10) + self.wait_query('show test.tables like "%t0"', 0, wait_query_seconds) # test drop multi tables tdSql.execute('drop table test.t3, test.t4') - self.wait_query('show test.tables like "%t3"', 0, 1) - self.wait_query('show test.tables like "%t4"', 0, 1) + self.wait_query('show test.tables like "%t3"', 0, wait_query_seconds) + self.wait_query('show test.tables like "%t4"', 0, wait_query_seconds) tdSql.query('show test.tables like "%tsma%"') tdSql.checkRows(0) @@ -1344,7 +1350,7 @@ class TDTestCase: # test drop stream tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') + self.wait_query('show transactions', 0, wait_query_seconds, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1446,7 +1452,7 @@ class TDTestCase: ['avg(c1)', 'avg(c2)'], 'nsdb', 'meters', '10m', 'tsma1') tdSql.execute('drop tsma nsdb.tsma1', queryTimes=1) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') + self.wait_query('show transactions', 0, wait_query_seconds, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database nsdb') def test_create_tsma_on_norm_table(self): @@ -1542,9 +1548,9 @@ class TDTestCase: self.drop_tsma('tsma_repeat', 'test') # tsma name include escape character - tdSql.execute("CREATE TSMA `129_tsma` ON test.meters FUNCTION(count(c3)) INTERVAL(5m); ") - tdSql.execute("CREATE TSMA `129_Tsma` ON test.meters FUNCTION(count(c3)) INTERVAL(5m); ") - tdSql.execute("CREATE TSMA `129_T*\-sma` ON test.meters FUNCTION(count(c3)) INTERVAL(5m); ") + self.create_tsma('`129_tsma`', 'test', 'meters', ['count(c3)'], '5m') + self.create_tsma('`129_Tsma`', 'test', 'meters', ['count(c3)'], '9m') + self.create_tsma('`129_T*\-sma`', 'test', 'meters', ['count(c3)'], '10m', expected_tsma_name='129_T*\\\\-sma') tdSql.execute("drop tsma test.`129_tsma`") tdSql.execute("drop tsma test.`129_Tsma`") tdSql.execute("drop tsma test.`129_T*\-sma`")