diff --git a/docs/examples/c/CMakeLists.txt b/docs/examples/c/CMakeLists.txt new file mode 100644 index 0000000000..f636084bab --- /dev/null +++ b/docs/examples/c/CMakeLists.txt @@ -0,0 +1,101 @@ +PROJECT(TDengine) + +IF (TD_LINUX) + INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc) + AUX_SOURCE_DIRECTORY(. SRC) + + add_executable(docs_connect_example "") + add_executable(docs_create_db_demo "") + add_executable(docs_insert_data_demo "") + add_executable(docs_query_data_demo "") + add_executable(docs_with_reqid_demo "") + add_executable(docs_sml_insert_demo "") + add_executable(docs_stmt_insert_demo "") + add_executable(docs_tmq_demo "") + + target_sources(docs_connect_example + PRIVATE + "connect_example.c" + ) + + target_sources(docs_create_db_demo + PRIVATE + "create_db_demo.c" + ) + + target_sources(docs_insert_data_demo + PRIVATE + "insert_data_demo.c" + ) + + target_sources(docs_query_data_demo + PRIVATE + "query_data_demo.c" + ) + + target_sources(docs_with_reqid_demo + PRIVATE + "with_reqid_demo.c" + ) + + target_sources(docs_sml_insert_demo + PRIVATE + "sml_insert_demo.c" + ) + + target_sources(docs_stmt_insert_demo + PRIVATE + "stmt_insert_demo.c" + ) + + target_sources(docs_tmq_demo + PRIVATE + "tmq_demo.c" + ) + + target_link_libraries(docs_connect_example + taos + ) + + target_link_libraries(docs_create_db_demo + taos + ) + + target_link_libraries(docs_insert_data_demo + taos + ) + + target_link_libraries(docs_query_data_demo + taos + ) + + target_link_libraries(docs_with_reqid_demo + taos + ) + + target_link_libraries(docs_sml_insert_demo + taos + ) + + target_link_libraries(docs_stmt_insert_demo + taos + ) + + target_link_libraries(docs_tmq_demo + taos + pthread + ) + + SET_TARGET_PROPERTIES(docs_connect_example PROPERTIES OUTPUT_NAME docs_connect_example) + SET_TARGET_PROPERTIES(docs_create_db_demo PROPERTIES OUTPUT_NAME docs_create_db_demo) + SET_TARGET_PROPERTIES(docs_insert_data_demo PROPERTIES OUTPUT_NAME docs_insert_data_demo) + SET_TARGET_PROPERTIES(docs_query_data_demo PROPERTIES OUTPUT_NAME docs_query_data_demo) + SET_TARGET_PROPERTIES(docs_with_reqid_demo PROPERTIES OUTPUT_NAME docs_with_reqid_demo) + SET_TARGET_PROPERTIES(docs_sml_insert_demo PROPERTIES OUTPUT_NAME docs_sml_insert_demo) + SET_TARGET_PROPERTIES(docs_stmt_insert_demo PROPERTIES OUTPUT_NAME docs_stmt_insert_demo) + SET_TARGET_PROPERTIES(docs_tmq_demo PROPERTIES OUTPUT_NAME docs_tmq_demo) +ENDIF () +IF (TD_DARWIN) + INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc) + AUX_SOURCE_DIRECTORY(. SRC) +ENDIF () diff --git a/docs/examples/c/Makefile b/docs/examples/c/Makefile new file mode 100644 index 0000000000..9fda575ec6 --- /dev/null +++ b/docs/examples/c/Makefile @@ -0,0 +1,34 @@ +# Makefile for building TDengine examples on TD Linux platform + +INCLUDE_DIRS = + +TARGETS = connect_example \ + create_db_demo \ + insert_data_demo \ + query_data_demo \ + with_reqid_demo \ + sml_insert_demo \ + stmt_insert_demo \ + tmq_demo + +SOURCES = connect_example.c \ + create_db_demo.c \ + insert_data_demo.c \ + query_data_demo.c \ + with_reqid_demo.c \ + sml_insert_demo.c \ + stmt_insert_demo.c \ + tmq_demo.c + +LIBS = -ltaos -lpthread + + +CFLAGS = -g + +all: $(TARGETS) + +$(TARGETS): + $(CC) $(CFLAGS) -o $@ $(wildcard $(@F).c) $(LIBS) + +clean: + rm -f $(TARGETS) \ No newline at end of file diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d4ad2dc0ad..8f483d8e88 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1685,6 +1685,7 @@ typedef struct { typedef struct { int32_t openVnodes; + int32_t dropVnodes; int32_t totalVnodes; int32_t masterNum; int64_t numOfSelectReqs; @@ -2825,8 +2826,8 @@ enum { TOPIC_SUB_TYPE__COLUMN, }; -#define DEFAULT_MAX_POLL_INTERVAL 300000 -#define DEFAULT_SESSION_TIMEOUT 12000 +#define DEFAULT_MAX_POLL_INTERVAL 300000 +#define DEFAULT_SESSION_TIMEOUT 12000 typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 07d56f9b07..50c096258e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -47,7 +47,7 @@ extern "C" { #define SYNC_HEARTBEAT_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_SNAP_RESEND_MS 1000 * 60 -#define SYNC_SNAP_TIMEOUT_MS 1000 * 300 +#define SYNC_SNAP_TIMEOUT_MS 1000 * 180 #define SYNC_VND_COMMIT_MIN_MS 3000 diff --git a/include/os/osString.h b/include/os/osString.h index 5f211ad2ee..30bfd61b62 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -51,7 +51,12 @@ typedef enum { M2C = 0, C2M } ConvType; #define strtod STR_TO_LD_FUNC_TAOS_FORBID #define strtold STR_TO_D_FUNC_TAOS_FORBID #define strtof STR_TO_F_FUNC_TAOS_FORBID + +#ifdef strndup +#undef strndup +#endif #define strndup STR_TO_F_FUNC_TAOS_FORBID + #endif #define tstrncpy(dst, src, size) \ diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index d0ea7055de..8ce4685716 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -23,12 +23,12 @@ #include "tglobal.h" #include "tmsgtype.h" -#define RAW_NULL_CHECK(c) \ - do { \ - if (c == NULL) { \ - code = terrno; \ - goto end; \ - } \ +#define RAW_NULL_CHECK(c) \ + do { \ + if (c == NULL) { \ + code = terrno; \ + goto end; \ + } \ } while (0) #define RAW_FALSE_CHECK(c) \ @@ -47,7 +47,7 @@ } \ } while (0) -#define LOG_ID_TAG "connId:0x%" PRIx64 ",qid:0x%" PRIx64 +#define LOG_ID_TAG "connId:0x%" PRIx64 ",QID:0x%" PRIx64 #define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId #define TMQ_META_VERSION "1.0" @@ -1188,7 +1188,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { pCreateReq->ctb.suid = pTableMeta->uid; SArray* pTagVals = NULL; - code = tTagToValArray((STag *)pCreateReq->ctb.pTag, &pTagVals); + code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pTableMeta); goto end; @@ -1206,18 +1206,19 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) { STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); if (pTagVal) { - if (pTagVal->cid != tag->colId){ + if (pTagVal->cid != tag->colId) { pTagVal->cid = tag->colId; rebuildTag = true; } } else { - uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name, (int)taosArrayGetSize(pTagVals), i, tag->colId); + uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name, + (int)taosArrayGetSize(pTagVals), i, tag->colId); } } } } taosMemoryFreeClear(pTableMeta); - if (rebuildTag){ + if (rebuildTag) { STag* ppTag = NULL; code = tTagNew(pTagVals, 1, false, &ppTag); taosArrayDestroy(pTagVals); @@ -1815,7 +1816,7 @@ end: static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) { // find schema data info - int32_t code = 0; + int32_t code = 0; SVCreateTbReq pCreateReq = {0}; SDecoder decoderTmp = {0}; @@ -1826,15 +1827,16 @@ static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) { RAW_NULL_CHECK(lenTmp); tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); - RAW_RETURN_CHECK (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq)); + RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq)); if (pCreateReq.type != TSDB_CHILD_TABLE) { code = TSDB_CODE_INVALID_MSG; goto end; } - if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL){ - RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq))); - } else{ + if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) { + RAW_RETURN_CHECK( + taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq))); + } else { tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); pCreateReq = (SVCreateTbReq){0}; } @@ -1927,7 +1929,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) // find schema data info SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); - SVgroupInfo vg = {0}; + SVgroupInfo vg = {0}; RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg)); if (pCreateReqDst) { // change stable name to get meta (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); @@ -1957,10 +1959,10 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) fields[i].bytes = pSW->pSchema[i].bytes; tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } - void* rawData = getRawDataFromRes(pRetrieve); - char err[ERR_MSG_LEN] = {0}; + void* rawData = getRawDataFromRes(pRetrieve); + char err[ERR_MSG_LEN] = {0}; SVCreateTbReq* pCreateReqTmp = NULL; - if (pCreateReqDst){ + if (pCreateReqDst) { RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp)); } code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 6b01b92445..0e1a4bc98e 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -77,6 +77,7 @@ typedef struct { typedef struct { int32_t vnodeNum; int32_t opened; + int32_t dropped; int32_t failed; bool updateVnodesList; int32_t threadIndex; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index d081e70ff0..3cf0382eba 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -311,6 +311,8 @@ static void *vmOpenVnodeInThread(void *param) { snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0); pThread->updateVnodesList = true; + pThread->dropped++; + (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1); continue; } @@ -352,8 +354,8 @@ static void *vmOpenVnodeInThread(void *param) { (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); } - dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, - pThread->failed); + dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, + pThread->opened, pThread->dropped, pThread->failed); return NULL; } @@ -427,7 +429,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { taosMemoryFree(threads); taosMemoryFree(pCfgs); - if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) { + if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) { dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes); terrno = TSDB_CODE_VND_INIT_FAILED; return -1; @@ -774,6 +776,7 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) { } pMgmt->state.openVnodes = 0; + pMgmt->state.dropVnodes = 0; dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum); for (int32_t t = 0; t < threadNum; ++t) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 6f7b24eab2..40bb99d6b5 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1637,6 +1637,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool pTrans->code = code; bool continueExec = true; if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { + taosMsleep(100); continueExec = true; } else { continueExec = false; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 14207e7fb3..484c5c0a16 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1523,6 +1523,7 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) { } memcpy(info.pTagVal, pCur->pVal, pCur->vLen); if (taosArrayPush(pUidTagInfo, &info) == NULL) { + taosMemoryFreeClear(info.pTagVal); metaCloseCtbCursor(pCur); taosHashCleanup(pSepecifiedUidMap); return terrno; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 05ccc2f5c3..db1bbc3fa7 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -364,7 +364,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { } pRspHead->vgId = htonl(req.upstreamNodeId); - if(pRspHead->vgId == 0) { + if (pRspHead->vgId == 0) { tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId); return TSDB_CODE_INVALID_MSG; } @@ -461,7 +461,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (code != TSDB_CODE_SUCCESS) { // return error not send rsp manually tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId, req.srcTaskId, tstrerror(code)); - } else { // send rsp manually only on success. + } else { // send rsp manually only on success. SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamTaskSendRetrieveRsp(&req, &rsp); } @@ -516,7 +516,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe } tDecoderClear(&decoder); - tqDebug("tq task:0x%x (vgId:%d) recv check rsp(qid:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, + tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); if (!isLeader) { @@ -1273,7 +1273,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMutexLock(&pTask->lock); if (pTask->chkInfo.checkpointId < req.checkpointId) { - tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%"PRId64, + tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64, pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId); streamMutexUnlock(&pTask->lock); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 37ab0a53f9..85f74b1672 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -22,6 +22,12 @@ #define ROCKS_BATCH_SIZE (4096) +void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) { + if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) { + tsdbTrace(" release lru cache failed"); + } +} + static int32_t tsdbOpenBCache(STsdb *pTsdb) { int32_t code = 0, lino = 0; int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; @@ -579,7 +585,7 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud if (pLastCol->dirty) { if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) { STsdb *pTsdb = (STsdb *)ud; - tsdbError("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__); + tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__); } } @@ -713,9 +719,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, { SLastCol *pLastCol = NULL; code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol); - if (code != TSDB_CODE_SUCCESS) { - tsdbWarn("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); + if (code == TSDB_CODE_INVALID_PARA) { + tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + } else if (code != TSDB_CODE_SUCCESS) { + tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + goto _exit; } if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[0], klen); @@ -724,9 +734,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, pLastCol = NULL; code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol); - if (code != TSDB_CODE_SUCCESS) { - tsdbWarn("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); + if (code == TSDB_CODE_INVALID_PARA) { + tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + } else if (code != TSDB_CODE_SUCCESS) { + tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + goto _exit; } if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[1], klen); @@ -739,9 +753,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, for (int i = 0; i < 2; i++) { LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen); if (h) { - if (taosLRUCacheRelease(pTsdb->lruCache, h, true)) { - tsdbInfo("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__); - } + tsdbLRUCacheRelease(pTsdb->lruCache, h, true); taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen); } } @@ -770,17 +782,13 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } } } else { @@ -798,17 +806,13 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } } @@ -827,10 +831,8 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra code = tsdbCacheCommitNoLock(pTsdb); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } if (pSchemaRow != NULL) { @@ -845,10 +847,8 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } } } else { @@ -871,10 +871,8 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } } @@ -895,10 +893,8 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { code = tsdbCacheCommitNoLock(pTsdb); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } STSchema *pTSchema = NULL; @@ -924,11 +920,8 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - taosMemoryFree(pTSchema); - TAOS_RETURN(code); } } } @@ -949,17 +942,13 @@ int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } // rocksMayWrite(pTsdb, true, false, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -974,18 +963,14 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h code = tsdbCacheCommitNoLock(pTsdb); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } rocksMayWrite(pTsdb, false); @@ -1005,17 +990,13 @@ int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } } @@ -1031,10 +1012,8 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool code = tsdbCacheCommitNoLock(pTsdb); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } for (int i = 0; i < TARRAY_SIZE(uids); ++i) { @@ -1042,10 +1021,8 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } } @@ -1174,9 +1151,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray } } - if (!taosLRUCacheRelease(pCache, h, false)) { - tsdbInfo("vgId:%d, %s release lru cache failed at line %d", TD_VID(pTsdb->pVnode), __func__, __LINE__); - } + tsdbLRUCacheRelease(pCache, h, false); TAOS_CHECK_EXIT(code); } else { if (!remainCols) { @@ -1237,9 +1212,13 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray SLastCol *pLastCol = NULL; code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); - if (code != TSDB_CODE_SUCCESS) { - tsdbWarn("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); + if (code == TSDB_CODE_INVALID_PARA) { + tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + } else if (code != TSDB_CODE_SUCCESS) { + tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + goto _exit; } /* if (code) { @@ -1391,9 +1370,8 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 } code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, + tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - TAOS_CHECK_GOTO(code, &lino, _exit); } } } @@ -1402,9 +1380,8 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 // 3. do update code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); if (code < TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - TAOS_CHECK_GOTO(code, &lino, _exit); } _exit: @@ -1491,9 +1468,8 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo // 3. do update code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - TAOS_CHECK_GOTO(code, &lino, _exit); } _exit: @@ -1709,9 +1685,13 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA } code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); - if (code != TSDB_CODE_SUCCESS) { - tsdbWarn("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); + if (code == TSDB_CODE_INVALID_PARA) { + tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + } else if (code != TSDB_CODE_SUCCESS) { + tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + goto _exit; } SLastCol *pToFree = pLastCol; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; @@ -1827,11 +1807,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache } if (h) { - code = taosLRUCacheRelease(pCache, h, false); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__); - goto _exit; - } + tsdbLRUCacheRelease(pCache, h, false); } } @@ -1858,13 +1834,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache // no cache or cache is invalid ++i; } - if (h) { - code = taosLRUCacheRelease(pCache, h, false); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__); - goto _exit; - } + tsdbLRUCacheRelease(pCache, h, false); } } @@ -1900,10 +1871,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE code = tsdbCacheCommit(pTsdb); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s commit failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tsdbTrace("vgId:%d, %s commit failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); } (void)taosThreadMutexLock(&pTsdb->lruMutex); @@ -1922,9 +1891,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE .cacheStatus = TSDB_LAST_CACHE_NO_CACHE}; code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1); } - if (taosLRUCacheRelease(pTsdb->lruCache, h, false) != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__); - } + tsdbLRUCacheRelease(pTsdb->lruCache, h, false); TAOS_CHECK_EXIT(code); } else { if (!remainCols) { @@ -1976,9 +1943,13 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE for (int i = 0; i < numKeys; ++i) { SLastCol *pLastCol = NULL; code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); - if (code != TSDB_CODE_SUCCESS) { - tsdbWarn("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); + if (code == TSDB_CODE_INVALID_PARA) { + tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + } else if (code != TSDB_CODE_SUCCESS) { + tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + tstrerror(code)); + goto _exit; } SIdxKey *idxKey = taosArrayGet(remainCols, i); SLastKey *pLastKey = &idxKey->key; @@ -3483,11 +3454,7 @@ _err: TAOS_RETURN(code); } -void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { - if (taosLRUCacheRelease(pCache, h, false)) { - tsdbError("%s release lru cache failed at line %d.", __func__, __LINE__); - } -} +void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); } void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) { taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity); @@ -3633,4 +3600,4 @@ void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t * (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex); tsdbCacheRelease(pFD->pTsdb->pgCache, handle); -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 5b6511a38e..36bfb56120 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -704,7 +704,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead pReader->cost.headFileLoadTime += (et1 - st) / 1000.0; -_end: +//_end: // tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); return code; } @@ -1872,9 +1872,9 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); } -static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader, - STableBlockScanInfo* pScanInfo, SRowKey* pSttKey, STsdbReader* pReader, - bool* copied) { +static int32_t tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader, + STableBlockScanInfo* pScanInfo, SRowKey* pSttKey, STsdbReader* pReader, + bool* copied) { int32_t code = TSDB_CODE_SUCCESS; *copied = false; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 46f4f86484..525573ee01 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -72,7 +72,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; @@ -94,7 +94,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) { ctx->pNames = param; ctx->pResList = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes)); if (NULL == ctx->pResList) { - qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbMetaNum, + qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbMetaNum, (int32_t)sizeof(SMetaRes)); ctgFreeTask(&task, true); CTG_ERR_RET(terrno); @@ -105,7 +105,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, tbNum:%d", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); return TSDB_CODE_SUCCESS; @@ -133,7 +133,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; @@ -161,7 +161,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; @@ -189,7 +189,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; @@ -223,7 +223,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; @@ -245,7 +245,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { ctx->pNames = param; ctx->pResList = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes)); if (NULL == ctx->pResList) { - qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbHashNum, + qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbHashNum, (int32_t)sizeof(SMetaRes)); ctgFreeTask(&task, true); CTG_ERR_RET(terrno); @@ -256,7 +256,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, tbNum:%d", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); return TSDB_CODE_SUCCESS; @@ -275,7 +275,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -293,7 +293,7 @@ int32_t ctgInitGetDnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -320,7 +320,7 @@ int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; @@ -348,7 +348,7 @@ int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; @@ -376,7 +376,7 @@ int32_t ctgInitGetUserTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user); return TSDB_CODE_SUCCESS; @@ -394,7 +394,7 @@ int32_t ctgInitGetSvrVerTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -426,7 +426,7 @@ int32_t ctgInitGetTbIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; @@ -459,7 +459,7 @@ int32_t ctgInitGetTbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; @@ -492,7 +492,7 @@ int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; @@ -514,7 +514,7 @@ int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { ctx->pNames = param; ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes)); if (NULL == ctx->pResList) { - qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum, + qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum, (int32_t)sizeof(SMetaRes)); ctgFreeTask(&task, true); CTG_ERR_RET(terrno); @@ -525,7 +525,7 @@ int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { CTG_ERR_RET(terrno); } - qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, viewNum:%d", pJob->queryId, taskIdx, + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, viewNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->viewNum); return TSDB_CODE_SUCCESS; @@ -546,7 +546,7 @@ int32_t ctgInitGetTbTSMATask(SCtgJob* pJob, int32_t taskId, void* param) { pTaskCtx->pNames = param; pTaskCtx->pResList = taosArrayInit(pJob->tbTsmaNum, sizeof(SMetaRes)); if (NULL == pTaskCtx->pResList) { - qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbTsmaNum, + qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbTsmaNum, (int32_t)sizeof(SMetaRes)); ctgFreeTask(&task, true); CTG_ERR_RET(terrno); @@ -574,7 +574,7 @@ int32_t ctgInitGetTSMATask(SCtgJob* pJob, int32_t taskId, void* param) { pTaskCtx->pNames = param; pTaskCtx->pResList = taosArrayInit(pJob->tsmaNum, sizeof(SMetaRes)); if (NULL == pTaskCtx->pResList) { - qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tsmaNum, + qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tsmaNum, (int32_t)sizeof(SMetaRes)); ctgFreeTask(&task, true); CTG_ERR_RET(terrno); @@ -603,7 +603,7 @@ static int32_t ctgInitGetTbNamesTask(SCtgJob* pJob, int32_t taskId, void* param) pTaskCtx->pNames = param; pTaskCtx->pResList = taosArrayInit(pJob->tbNameNum, sizeof(SMetaRes)); if (NULL == pTaskCtx->pResList) { - qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbNameNum, + qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbNameNum, (int32_t)sizeof(SMetaRes)); ctgFreeTask(&task, true); CTG_ERR_RET(terrno); @@ -1048,7 +1048,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const } double el = (taosGetTimestampUs() - st) / 1000.0; - qDebug("qid:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms", + qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms", pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate, el); return TSDB_CODE_SUCCESS; @@ -1450,16 +1450,17 @@ _return: int32_t ctgCallUserCb(void* param) { SCtgJob* pJob = (SCtgJob*)param; - qDebug("qid:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); + qDebug("QID:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode); - qDebug("qid:0x%" PRIx64 " ctg end to call user cb", pJob->queryId); + qDebug("QID:0x%" PRIx64 " ctg end to call user cb", pJob->queryId); int64_t refId = pJob->refId; int32_t code = taosRemoveRef(gCtgMgmt.jobPool, refId); if (code) { - qError("qid:0x%" PRIx64 " remove ctg job %" PRId64 " from jobPool failed, error:%s", pJob->queryId, refId, tstrerror(code)); + qError("QID:0x%" PRIx64 " remove ctg job %" PRId64 " from jobPool failed, error:%s", pJob->queryId, refId, + tstrerror(code)); } return TSDB_CODE_SUCCESS; @@ -1469,7 +1470,7 @@ void ctgUpdateJobErrCode(SCtgJob* pJob, int32_t errCode) { if (!NEED_CLIENT_REFRESH_VG_ERROR(errCode) || errCode == TSDB_CODE_SUCCESS) return; atomic_store_32(&pJob->jobResCode, errCode); - qDebug("qid:0x%" PRIx64 " ctg job errCode updated to %s", pJob->queryId, tstrerror(errCode)); + qDebug("QID:0x%" PRIx64 " ctg job errCode updated to %s", pJob->queryId, tstrerror(errCode)); return; } @@ -1481,7 +1482,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { return TSDB_CODE_SUCCESS; } - qDebug("qid:0x%" PRIx64 " task %d end with res %s", pJob->queryId, pTask->taskId, tstrerror(rspCode)); + qDebug("QID:0x%" PRIx64 " task %d end with res %s", pJob->queryId, pTask->taskId, tstrerror(rspCode)); pTask->code = rspCode; pTask->status = CTG_TASK_DONE; @@ -1490,7 +1491,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1); if (taskDone < taosArrayGetSize(pJob->pTasks)) { - qDebug("qid:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, + qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, (int32_t)taosArrayGetSize(pJob->pTasks)); ctgUpdateJobErrCode(pJob, rspCode); @@ -4347,7 +4348,7 @@ int32_t ctgLaunchJob(SCtgJob* pJob) { CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - qDebug("qid:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); + qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); pTask = taosArrayGet(pJob->pTasks, i); @@ -4360,7 +4361,7 @@ int32_t ctgLaunchJob(SCtgJob* pJob) { } if (taskNum <= 0) { - qDebug("qid:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); + qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); CTG_ERR_RET(taosAsyncExec(ctgCallUserCb, pJob, NULL)); #if CTG_BATCH_FETCH diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index d6e941c819..ed9dc81dd7 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -47,7 +47,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu msgNum = 0; } - ctgDebug("qid:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, + ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); @@ -114,7 +114,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu pMsgCtx->pBatchs = pBatchs; - ctgDebug("qid:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, + ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); (void)(*gCtgAsyncFps[pTask->type].handleRspFp)( @@ -454,7 +454,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - qDebug("qid:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, + qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); #if CTG_BATCH_FETCH @@ -702,7 +702,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (TDMT_VND_TABLE_CFG == msgType) { SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; pName = ctx->pName; - } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) { + } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); @@ -808,7 +808,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) { SCtgBatch* pBatch = (SCtgBatch*)p; int32_t msgSize = 0; - ctgDebug("qid:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); + ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize)); code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs, @@ -1124,10 +1124,11 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n int32_t code = tNameExtractFullName(name, tbFName); if (code) { - ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname); + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, + name->dbname, name->tname); CTG_ERR_RET(code); } - + code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); @@ -1450,7 +1451,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t code = tNameExtractFullName(pTableName, tbFName); if (code) { - ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, + pTableName->dbname, pTableName->tname); CTG_ERR_RET(code); } @@ -1523,7 +1525,8 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t code = tNameExtractFullName(pTableName, tbFName); if (code) { - ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, + pTableName->dbname, pTableName->tname); CTG_ERR_RET(code); } @@ -1632,10 +1635,11 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* int32_t reqType = TDMT_MND_VIEW_META; SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; - char fullName[TSDB_TABLE_FNAME_LEN]; + char fullName[TSDB_TABLE_FNAME_LEN]; int32_t code = tNameExtractFullName(pName, fullName); if (code) { - ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pName->type, pName->dbname, pName->tname); + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pName->type, + pName->dbname, pName->tname); CTG_ERR_RET(code); } @@ -1693,10 +1697,11 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa int32_t msgLen = 0; SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; int32_t code = tNameExtractFullName(name, tbFName); if (code) { - ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname); + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, + name->dbname, name->tname); CTG_ERR_RET(code); } @@ -1757,10 +1762,11 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c char tbFName[TSDB_TABLE_FNAME_LEN]; int32_t code = tNameExtractFullName(pTbName, tbFName); if (code) { - ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTbName->type, pTbName->dbname, pTbName->tname); + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTbName->type, + pTbName->dbname, pTbName->tname); CTG_ERR_RET(code); } - + SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 96cd783d2f..86a38017bd 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -452,7 +452,8 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardN code = taosHashRemove(dbCache->tbCache, key, len); if (code) { - qError("taosHashRemove table cache failed, key:%s, len:%d, error:%s", (char*)key, (int32_t)len, tstrerror(code)); + qError("taosHashRemove table cache failed, key:%s, len:%d, error:%s", (char*)key, (int32_t)len, + tstrerror(code)); } cacheSize = @@ -1096,7 +1097,7 @@ void ctgFreeJob(void* job) { taosMemoryFree(job); - qDebug("qid:0x%" PRIx64 ", ctg job 0x%" PRIx64 " freed", qid, rid); + qDebug("QID:0x%" PRIx64 ", ctg job 0x%" PRIx64 " freed", qid, rid); } int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target) { @@ -1241,10 +1242,11 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* d char tbFullName[TSDB_TABLE_FNAME_LEN]; code = tNameExtractFullName(pTableName, tbFullName); if (code) { - ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); + ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, + pTableName->dbname, pTableName->tname); CTG_ERR_RET(code); } - + uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix); @@ -1704,7 +1706,8 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) { } int32_t ctgUpdateSendTargetInfo(SMsgSendInfo* pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) { - if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META || msgType == TDMT_VND_TABLE_NAME) { + if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META || + msgType == TDMT_VND_TABLE_NAME) { pMsgSendInfo->target.type = TARGET_TYPE_VNODE; pMsgSendInfo->target.vgId = vgId; pMsgSendInfo->target.dbFName = taosStrdup(dbFName); @@ -2010,7 +2013,8 @@ int32_t ctgChkSetTbAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) { char dbFName[TSDB_DB_FNAME_LEN]; code = tNameExtractFullName(&req->pRawReq->tbName, tbFName); if (code) { - ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); + ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), + req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); CTG_ERR_RET(code); } @@ -2201,7 +2205,8 @@ int32_t ctgChkSetViewAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) } else { code = tNameExtractFullName(&req->pRawReq->tbName, viewFName); if (code) { - ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); + ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), + req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); CTG_ERR_RET(code); } } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index be90e22091..05654f65c1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1143,11 +1143,11 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S varDataSetLen(tmp, tagVal.nData); memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); code = colDataSetVal(pColInfo, i, tmp, false); - QUERY_CHECK_CODE(code, lino, _end); #if TAG_FILTER_DEBUG qDebug("tagfilter varch:%s", tmp + 2); #endif taosMemoryFree(tmp); + QUERY_CHECK_CODE(code, lino, _end); } else { code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false); QUERY_CHECK_CODE(code, lino, _end); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index b673f88d50..6e7da8a8db 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1148,12 +1148,12 @@ _end: return code; } -static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { - pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols; - for (int i = 0; i < pFillSup->numOfAllCols; i++) { - SFillColInfo* pCol = &pFillSup->pAllColInfo[i]; - SResSchema* pSchema = &pCol->pExpr->base.resSchema; - pFillSup->rowSize += pSchema->bytes; +static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) { + int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock); + pFillSup->rowSize = sizeof(SResultCellData) * numOfCols; + for (int i = 0; i < numOfCols; i++) { + SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i); + pFillSup->rowSize += pCol->info.bytes; } pFillSup->next.key = INT64_MIN; pFillSup->nextNext.key = INT64_MIN; @@ -1168,7 +1168,7 @@ static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { } static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval, - SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI) { + SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); @@ -1197,7 +1197,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod pFillSup->interval = *pInterval; pFillSup->pAPI = pAPI; - code = initResultBuf(pFillSup); + code = initResultBuf(pInputRes, pFillSup); QUERY_CHECK_CODE(code, lino, _end); SExprInfo* noFillExpr = NULL; @@ -1389,7 +1389,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); - pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); + pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno); + + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI, + pInfo->pSrcBlock); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _error); @@ -1398,8 +1402,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); - pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); - QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno); + code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 43159bce20..c3aa95f5b7 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -37,7 +37,7 @@ static int32_t dumpQueryPlan(SQueryPlan* pPlan) { char* pStr = NULL; code = nodesNodeToString((SNode*)pPlan, false, &pStr, NULL); if (TSDB_CODE_SUCCESS == code) { - planDebugL("qid:0x%" PRIx64 " Query Plan, JsonPlan: %s", pPlan->queryId, pStr); + planDebugL("QID:0x%" PRIx64 " Query Plan, JsonPlan: %s", pPlan->queryId, pStr); taosMemoryFree(pStr); } return code; @@ -123,7 +123,7 @@ int32_t qContinuePlanPostQuery(void* pPostPlan) { } int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { - planDebug("qid:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId); + planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId); return setSubplanExecutionNode(subplan->pNode, groupId, pSource); } @@ -143,7 +143,7 @@ static void clearSubplanExecutionNode(SPhysiNode* pNode) { } void qClearSubplanExecutionNode(SSubplan* pSubplan) { - planDebug("qid:0x%" PRIx64 " clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId); + planDebug("QID:0x%" PRIx64 " clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId); clearSubplanExecutionNode(pSubplan->pNode); } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 4f08c93c1e..2a4951d237 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -628,8 +628,8 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell cell = cell->pNext) { pWhenThen = (SWhenThenNode *)node; - SCL_ERR_RET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen)); - SCL_ERR_RET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen)); + SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen)); + SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen)); SCL_ERR_JRET(vectorCompareImpl(pCase, pWhen, pComp, rowIdx, 1, TSDB_ORDER_ASC, OP_TYPE_EQUAL)); @@ -646,6 +646,10 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell goto _return; } + sclFreeParam(pWhen); + sclFreeParam(pThen); + taosMemoryFreeClear(pWhen); + taosMemoryFreeClear(pThen); } if (pElse) { @@ -672,8 +676,8 @@ _return: sclFreeParam(pWhen); sclFreeParam(pThen); - taosMemoryFree(pWhen); - taosMemoryFree(pThen); + taosMemoryFreeClear(pWhen); + taosMemoryFreeClear(pThen); SCL_RET(code); } diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index f475c974cc..03145da939 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -728,7 +728,7 @@ void schFreeJobImpl(void *job) { uint64_t queryId = pJob->queryId; int64_t refId = pJob->refId; - qDebug("qid:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); + qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); schDropJobAllTasks(pJob); @@ -775,7 +775,7 @@ void schFreeJobImpl(void *job) { taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->sql); - int32_t code = tsem_destroy(&pJob->rspSem); + int32_t code = tsem_destroy(&pJob->rspSem); if (code) { qError("tsem_destroy failed, error:%s", tstrerror(code)); } @@ -786,7 +786,7 @@ void schFreeJobImpl(void *job) { schCloseJobRef(); } - qDebug("qid:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); + qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); } int32_t schJobFetchRows(SSchJob *pJob) { @@ -797,7 +797,7 @@ int32_t schJobFetchRows(SSchJob *pJob) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - code = tsem_wait(&pJob->rspSem); + code = tsem_wait(&pJob->rspSem); if (code) { qError("tsem_wait for fetch rspSem failed, error:%s", tstrerror(code)); SCH_RET(code); @@ -821,7 +821,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); if (NULL == pJob) { - qError("qid:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob)); + qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob)); SCH_ERR_JRET(terrno); } @@ -831,7 +831,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { if (pReq->sql) { pJob->sql = taosStrdup(pReq->sql); if (NULL == pJob->sql) { - qError("qid:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql); + qError("QID:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql); SCH_ERR_JRET(terrno); } } @@ -839,7 +839,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { if (pReq->allocatorRefId > 0) { pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId); if (pJob->allocatorRefId <= 0) { - qError("qid:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId); + qError("QID:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId); SCH_ERR_JRET(terrno); } } @@ -851,11 +851,11 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->pWorkerCb = pReq->pWorkerCb; if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { - qDebug("qid:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); + qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); } else { pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL); if (NULL == pJob->nodeList) { - qError("qid:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, + qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, (int32_t)taosArrayGetSize(pReq->pNodeList)); SCH_ERR_JRET(terrno); } @@ -918,7 +918,7 @@ _return: int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { int32_t code = 0; - qDebug("qid:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId); + qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId); SCH_ERR_RET(schLaunchJob(pJob)); @@ -926,7 +926,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); code = tsem_wait(&pJob->rspSem); if (code) { - qError("qid:0x%" PRIx64 " tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code)); + qError("QID:0x%" PRIx64 " tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code)); SCH_ERR_RET(code); } } @@ -1191,7 +1191,7 @@ int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_ (void)schAcquireJob(rId, &pJob); if (NULL == pJob) { - qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId); + qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId); SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST); } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index fe601c6b86..b3106d8c7e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -500,7 +500,7 @@ _return: int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; - qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); // called if drop task rsp received code (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error @@ -513,7 +513,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; - qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); if (pMsg) { taosMemoryFree(pMsg->pData); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 375ad5fa37..4c609fa5e2 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -996,7 +996,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t code = 0; - qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId, + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status)); if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) { @@ -1043,12 +1043,12 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { continue; } - qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId); + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId); pJob = NULL; (void)schAcquireJob(localRsp->rId, &pJob); if (NULL == pJob) { - qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, + qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, localRsp->tId, localRsp->rId); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST); } @@ -1068,7 +1068,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { (void)schReleaseJob(pJob->refId); - qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, localRsp->tId, code); SCH_ERR_JRET(code); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 2688617823..75bcc326b3 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -108,7 +108,7 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) { pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, - &pTask->outputInfo.fixedDispatcher.epSet); + &pTask->outputInfo.fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamTaskStartMonitorCheckRsp(pTask); @@ -171,14 +171,14 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage); SStreamTaskState pState = streamTaskGetStatus(pTask); - stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(qid:0x%" PRIx64 + stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(QID:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status); streamMetaReleaseTask(pMeta, pTask); } else { pRsp->status = TASK_DOWNSTREAM_NOT_READY; - stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(qid:0x%" PRIx64 + stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(QID:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp check_status %d", pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status); } @@ -259,7 +259,8 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa void* buf = rpcMallocCont(sizeof(SMsgHead) + len); if (buf == NULL) { - stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__, tstrerror(code)); + stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__, + tstrerror(code)); return terrno; } @@ -332,7 +333,7 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void processDownstreamReadyRsp(SStreamTask* pTask) { EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; - int32_t code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); + int32_t code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); if (code) { stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code)); } @@ -354,7 +355,7 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); - if (code != 0) { // todo: handle error + if (code != 0) { // todo: handle error stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code)); } } @@ -373,8 +374,9 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t vgId = pTask->pMeta->vgId; - int32_t code = 0;; - bool existed = false; + int32_t code = 0; + ; + bool existed = false; streamMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 699774ed52..fd5591c488 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -120,38 +120,39 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + bool unQualified = false; + const char* id = pTask->id.idStr; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", pTask->id.idStr); + stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id); return TSDB_CODE_INVALID_MSG; } if (pRsp->rspCode != TSDB_CODE_SUCCESS) { - stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr, - pRsp->upstreamTaskId, tstrerror(pRsp->rspCode)); + stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId, + tstrerror(pRsp->rspCode)); return TSDB_CODE_SUCCESS; } streamMutexLock(&pTask->lock); SStreamTaskState status = streamTaskGetStatus(pTask); - if (status.state != TASK_STATUS__CK) { - stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name); - streamMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - streamMutexUnlock(&pTask->lock); - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - streamMutexLock(&pInfo->lock); - if (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId) { - stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name); - - streamMutexUnlock(&pInfo->lock); + if (status.state != TASK_STATUS__CK) { + stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } + streamMutexLock(&pInfo->lock); + unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId); streamMutexUnlock(&pInfo->lock); + if (unQualified) { + stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions. int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId, pRsp->upstreamTaskId); @@ -963,11 +964,44 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** return 0; } +static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, SArray* pNotSendList) { + const char* id = pTask->id.idStr; + SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; + int32_t vgId = pTask->pMeta->vgId; + + int32_t code = doChkptStatusCheck(pTask); + if (code) { + return code; + } + + code = doFindNotSendUpstream(pTask, pList, &pNotSendList); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref); + return code; + } + + // do send retrieve checkpoint trigger msg to upstream + code = doSendRetrieveTriggerMsg(pTask, pNotSendList); + if (code) { + stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); + code = 0; + } + + return code; +} + void checkpointTriggerMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; int32_t vgId = pTask->pMeta->vgId; int64_t now = taosGetTimestampMs(); const char* id = pTask->id.idStr; + SArray* pNotSendList = NULL; + SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg + int32_t code = 0; + int32_t numOfNotSend = 0; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; @@ -1008,40 +1042,18 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); + code = chkptTriggerRecvMonitorHelper(pTask, pNotSendList); + streamMutexUnlock(&pActiveInfo->lock); - int32_t code = doChkptStatusCheck(pTask); - if (code) { - streamMutexUnlock(&pActiveInfo->lock); + if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - // send msg to retrieve checkpoint trigger msg - SArray* pList = pTask->upstreamInfo.pList; - SArray* pNotSendList = NULL; - - code = doFindNotSendUpstream(pTask, pList, &pNotSendList); - if (code) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref); - streamMutexUnlock(&pActiveInfo->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - taosArrayDestroy(pNotSendList); return; } - // do send retrieve checkpoint trigger msg to upstream - int32_t size = taosArrayGetSize(pNotSendList); - code = doSendRetrieveTriggerMsg(pTask, pNotSendList); - if (code) { - stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); - } - - streamMutexUnlock(&pActiveInfo->lock); - // check every 100ms - if (size > 0) { + numOfNotSend = taosArrayGetSize(pNotSendList); + if (numOfNotSend > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); } else { @@ -1106,19 +1118,13 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { return code; } -bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) { +static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) { int64_t now = taosGetTimestampMs(); const char* id = pTask->id.idStr; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SStreamTaskState pStatus = streamTaskGetStatus(pTask); - if (pStatus.state != TASK_STATUS__CK) { - return false; - } - - streamMutexLock(&pInfo->lock); if (!pInfo->dispatchTrigger) { - streamMutexUnlock(&pInfo->lock); return false; } @@ -1146,14 +1152,29 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId); } - streamMutexUnlock(&pInfo->lock); return true; } - streamMutexUnlock(&pInfo->lock); return false; } +bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) { + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + SStreamTaskState pStatus = streamTaskGetStatus(pTask); + + if (pStatus.state != TASK_STATUS__CK) { + return false; + } + + streamMutexLock(&pInfo->lock); + bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId); + streamMutexUnlock(&pInfo->lock); + + return send; +} + void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) { *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList); @@ -1169,8 +1190,10 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; int64_t now = taosGetTimestampMs(); + int32_t code = 0; streamMutexLock(&pInfo->lock); + pInfo->dispatchTrigger = true; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; @@ -1178,8 +1201,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); if (px == NULL) { // pause the stream task, if memory not enough - streamMutexUnlock(&pInfo->lock); - return terrno; + code = terrno; } } else { for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { @@ -1191,14 +1213,15 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); if (px == NULL) { // pause the stream task, if memory not enough - streamMutexUnlock(&pInfo->lock); - return terrno; + code = terrno; + break; } } } streamMutexUnlock(&pInfo->lock); - return 0; + + return code; } int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a630296366..318720b5b0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -792,7 +792,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - int32_t code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); + code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); if (code != 0) { stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code)); } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index dde3595eb3..19391bf7a0 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -42,7 +42,7 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); - for (int k = 0; k < numOfExisted; ++k) { + for (int32_t k = 0; k < numOfExisted; ++k) { if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) { return true; } @@ -56,7 +56,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { streamMutexLock(&pTask->lock); int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); - for (int j = 0; j < num; ++j) { + for (int32_t j = 0; j < num; ++j) { SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j); bool exist = existInHbMsg(pMsg, pTaskEpset); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 9943fd1701..9ade5e5638 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -796,7 +796,7 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } - if (cJSON_AddItemToObject(pRoot, "meta", pMeta) != 0) { + if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) { wInfo("vgId:%d, failed to add meta to root", pWal->cfg.vgId); } (void)sprintf(buf, "%" PRId64, pWal->vers.firstVer); @@ -816,13 +816,13 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) { wInfo("vgId:%d, failed to add lastVer to meta", pWal->cfg.vgId); } - if (cJSON_AddItemToObject(pRoot, "files", pFiles) != 0) { + if (!cJSON_AddItemToObject(pRoot, "files", pFiles)) { wInfo("vgId:%d, failed to add files to root", pWal->cfg.vgId); } SWalFileInfo* pData = pWal->fileInfoSet->pData; for (int i = 0; i < sz; i++) { SWalFileInfo* pInfo = &pData[i]; - if (cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject()) != 0) { + if (!cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject())) { wInfo("vgId:%d, failed to add field to files", pWal->cfg.vgId); } if (pField == NULL) { diff --git a/tests/docs-examples-test/c.sh b/tests/docs-examples-test/c.sh new file mode 100644 index 0000000000..54e334b22e --- /dev/null +++ b/tests/docs-examples-test/c.sh @@ -0,0 +1,92 @@ +#!/bin/bash + +pgrep taosd || taosd >> /dev/null 2>&1 & +pgrep taosadapter || taosadapter >> /dev/null 2>&1 & + +GREEN='\033[0;32m' +RED='\033[0;31m' +NC='\033[0m' + +TEST_PATH="../../docs/examples/c" +echo "setting TEST_PATH: $TEST_PATH" + +cd "${TEST_PATH}" || { echo -e "${RED}Failed to change directory to ${TEST_PATH}${NC}"; exit 1; } + +LOG_FILE="docs-c-test-out.log" + +> $LOG_FILE + +make > "$LOG_FILE" 2>&1 + +if [ $? -eq 0 ]; then + echo -e "${GREEN}Make completed successfully.${NC}" +else + echo -e "${RED}Make failed. Check log file: $LOG_FILE${NC}" + cat "$LOG_FILE" + exit 1 +fi + + +declare -a TEST_EXES=( + "connect_example" + "create_db_demo" + "insert_data_demo" + "query_data_demo" + "with_reqid_demo" + "stmt_insert_demo" + "tmq_demo" + "sml_insert_demo" +) + +declare -a NEED_CLEAN=( + "true" + "false" + "false" + "false" + "false" + "false" + "false" + "true" +) + +totalCases=0 +totalFailed=0 +totalSuccess=0 + +for i in "${!TEST_EXES[@]}"; do + TEST_EXE="${TEST_EXES[$i]}" + NEED_CLEAN_FLAG="${NEED_CLEAN[$i]}" + + if [ "$NEED_CLEAN_FLAG" = "true" ]; then + echo "Cleaning database before executing $TEST_EXE..." + taos -s "drop database if exists power" >> $LOG_FILE 2>&1 + fi + + echo "Executing $TEST_EXE..." + ./$TEST_EXE >> $LOG_FILE 2>&1 + RESULT=$? + + if [ "$RESULT" -eq 0 ]; then + totalSuccess=$((totalSuccess + 1)) + echo "[$GREEN OK $NC] $TEST_EXE executed successfully." + else + totalFailed=$((totalFailed + 1)) + echo "[$RED FAILED $NC] $TEST_EXE exited with code $RESULT." + fi + + totalCases=$((totalCases + 1)) +done + +tail -n 40 $LOG_FILE + +echo -e "\nTotal number of cases executed: $totalCases" +if [ "$totalSuccess" -gt "0" ]; then + echo -e "\n${GREEN} ### Total $totalSuccess C case(s) succeed! ### ${NC}" +fi + +if [ "$totalFailed" -ne "0" ]; then + echo -e "\n${RED} ### Total $totalFailed C case(s) failed! ### ${NC}" + exit 1 +fi + +echo "All tests completed." \ No newline at end of file diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index aab0115564..cd3306a994 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1577,6 +1577,7 @@ ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/telnet_tcp.py -R #docs-examples test +,,n,docs-examples-test,bash c.sh ,,n,docs-examples-test,bash python.sh ,,n,docs-examples-test,bash node.sh ,,n,docs-examples-test,bash csharp.sh