diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 9d1142801d..b41633b0a8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -146,7 +146,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, msgtype:%s qtype:%d", pHead->vgId, pMsg, + dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, terrstr(), TMSG_INFO(pMsg->msgType), qtype); return terrno != 0 ? terrno : -1; } diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 85293eff30..941d2c6d72 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -133,7 +133,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb); if (ret < 0) { - metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno)); + metaError("vgId:%d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index dc16c2321b..db71c21615 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -599,7 +599,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { for (int i = 0; i < pSW->number; ++i) { smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i); if (metaGetTableEntryByUid(&mr, smaId) < 0) { - metaWarn("vgId:%d, no entry for tbId: %" PRIi64 ", smaId: %" PRIi64, TD_VID(pMeta->pVnode), uid, smaId); + metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId); continue; } pTSma = pSW->tSma + smaIdx; @@ -647,7 +647,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { SMetaReader mr = {0}; metaReaderInit(&mr, pMeta, 0); if (metaGetTableEntryByUid(&mr, indexUid) < 0) { - metaWarn("vgId:%d, failed to get table entry for smaId: %" PRIi64, TD_VID(pMeta->pVnode), indexUid); + metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid); metaReaderClear(&mr); return NULL; } diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index 0b6a526d8c..1e5b699fce 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -57,12 +57,12 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) { if (metaHandleSmaEntry(pMeta, &me) < 0) goto _err; - metaDebug("vgId:%d, tsma is created, name:%s uid: %" PRId64, TD_VID(pMeta->pVnode), pCfg->indexName, pCfg->indexUid); + metaDebug("vgId:%d, tsma is created, name:%s uid:%" PRId64, TD_VID(pMeta->pVnode), pCfg->indexName, pCfg->indexUid); return 0; _err: - metaError("vgId:%d, failed to create tsma: %s uid: %" PRId64 " since %s", TD_VID(pMeta->pVnode), pCfg->indexName, + metaError("vgId:%d, failed to create tsma:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pCfg->indexName, pCfg->indexUid, tstrerror(terrno)); return -1; } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 7236ef9991..ff455bfe33 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -204,12 +204,12 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ++pMeta->pVnode->config.vndStats.numOfSTables; - metaDebug("vgId:%d, super table is created, name:%s uid: %" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid); + metaDebug("vgId:%d, super table is created, name:%s uid:%" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid); return 0; _err: - metaError("vgId:%d, failed to create super table: %s uid: %" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, + metaError("vgId:%d, failed to create super table:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, pReq->suid, tstrerror(terrno)); return -1; } @@ -996,7 +996,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { tbDbKey.version = pME->version; tbDbKey.uid = pME->uid; - metaDebug("vgId:%d, start to save table version:%" PRId64 "uid: %" PRId64, TD_VID(pMeta->pVnode), pME->version, + metaDebug("vgId:%d, start to save table version:%" PRId64 " uid:%" PRId64, TD_VID(pMeta->pVnode), pME->version, pME->uid); pKey = &tbDbKey; @@ -1031,7 +1031,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { return 0; _err: - metaError("vgId:%d, failed to save table version:%" PRId64 "uid: %" PRId64 " %s", TD_VID(pMeta->pVnode), pME->version, + metaError("vgId:%d, failed to save table version:%" PRId64 "uid:%" PRId64 " %s", TD_VID(pMeta->pVnode), pME->version, pME->uid, tstrerror(terrno)); taosMemoryFree(pVal); diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 3c832e9c2a..be2828d187 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -71,7 +71,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee goto _err; } - tsdbDebug("vgId:%d, tsdb is opened for %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, + tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2); *ppTsdb = pTsdb; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bd1b2db145..265e20e55e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1197,7 +1197,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* setComposedBlockFlag(pReader, true); double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; - tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64 + tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64 " - %" PRId64 " %s", pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr); @@ -2647,7 +2647,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl return code; _err: - tsdbError("failed to create data reader, code: %s %s", tstrerror(code), pReader->idStr); + tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index d1584f701e..0623b3bd10 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -40,7 +40,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { pVnode->pPool = pPool; } - vDebug("vgId:%d, vnode buffer pool is opened, pool size: %" PRId64, TD_VID(pVnode), size); + vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e8d029a88f..650c98eb88 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -95,19 +95,19 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { // save info to a vnode_tmp.json pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - vError("failed to open info file: %s for write: %s", fname, terrstr()); + vError("failed to open info file:%s for write:%s", fname, terrstr()); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (taosWriteFile(pFile, data, strlen(data)) < 0) { - vError("failed to write info file: %s data: %s", fname, terrstr()); + vError("failed to write info file:%s data:%s", fname, terrstr()); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } if (taosFsyncFile(pFile) < 0) { - vError("failed to fsync info file: %s error: %s", fname, terrstr()); + vError("failed to fsync info file:%s error:%s", fname, terrstr()); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -117,7 +117,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { // free info binary taosMemoryFree(data); - vInfo("vgId:%d, vnode info is saved, fname: %s", pInfo->config.vgId, fname); + vInfo("vgId:%d, vnode info is saved, fname:%s", pInfo->config.vgId, fname); return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 69f87a2c1a..314c0e7390 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -55,7 +55,7 @@ int vnodeInit(int nthreads) { vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread)); if (vnodeGlobal.threads == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - vError("failed to init vnode module since: %s", tstrerror(terrno)); + vError("failed to init vnode module since:%s", tstrerror(terrno)); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index e273a41f48..1ba74ac3be 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -23,13 +23,13 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { // check config if (vnodeCheckCfg(pCfg) < 0) { - vError("vgId:%d, failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno)); + vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno)); return -1; } // create vnode env if (tfsMkdirAt(pTfs, path, (SDiskID){0}) < 0) { - vError("vgId:%d, failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno)); + vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno)); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 16bcb967cd..f49802d69b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -262,7 +262,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp return 0; _err: - vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), + vError("vgId:%d, process %s request failed since %s, version:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), tstrerror(terrno), version); return -1; } @@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { } int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { - vTrace("message in fetch queue is processing"); + vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && !vnodeIsLeader(pVnode)) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 99173e87d7..5c5163eb41 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -518,15 +518,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c rpcMsg.info.conn.applyTerm = cbMeta.term; vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 - ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", + ", weak:%d, code:%d, state:%d %s, type:%s", syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak, - cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); + cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; - vError("vgId:%d, sync commit error, msgtype:%d,%s, index:%ld, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), - pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code)); + vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), + TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code)); if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } @@ -537,10 +537,9 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { if (cbMeta.isWeak == 1) { SVnode *pVnode = pFsm->data; - vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 - ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", + vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, - syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); + syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); if (cbMeta.code == 0) { SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; @@ -552,8 +551,8 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; - vError("vgId:%d, sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), - pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code)); + vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync), + TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code)); if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } @@ -563,9 +562,9 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SVnode *pVnode = pFsm->data; - vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", + vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, - syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); + syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); } #define USE_TSDB_SNAPSHOT diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f16b485240..16075ec1f1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -224,6 +224,7 @@ typedef struct SOperatorInfo { struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator SOperatorFpSet fpSet; + int16_t resultDataBlockId; } SOperatorInfo; typedef enum { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6804a1258c..cb31137b20 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3934,6 +3934,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t type = nodeType(pPhyNode); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { + SOperatorInfo* pOperator = NULL; if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; @@ -3951,11 +3952,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; - return pOperator; - } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; int32_t code = @@ -3972,14 +3971,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); + pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; - return pOperator; - } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { - return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); + pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { @@ -4001,12 +3998,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo #endif pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); - return pOperator; - + pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; - return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); + pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo); @@ -4015,7 +4010,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); @@ -4041,7 +4036,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, ""); cleanupQueryTableDataCond(&cond); - return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); + pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; @@ -4058,12 +4053,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); + pOperator = createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { - return createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); + pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else { ASSERT(0); } + pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; + return pOperator; } int32_t num = 0; @@ -4075,6 +4072,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); if (ops[i] == NULL) { return NULL; + } else { + ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; } } @@ -4208,8 +4207,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else { ASSERT(0); } - taosMemoryFree(ops); + + pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; return pOptr; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 4134ce5dbf..e96844f1e3 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -26,7 +26,34 @@ static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); -static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode); +static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode); + +static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode) { + SNode* pMergeCondition = pJoinNode->pMergeCondition; + if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { + SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; + SColumnNode* col1 = (SColumnNode*)pNode->pLeft; + SColumnNode* col2 = (SColumnNode*)pNode->pRight; + SColumnNode* leftTsCol = NULL; + SColumnNode* rightTsCol = NULL; + if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) { + ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId); + leftTsCol = col1; + rightTsCol = col2; + } else { + ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId); + ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId); + leftTsCol = col2; + rightTsCol = col1; + } + setJoinColumnInfo(&pInfo->leftCol, leftTsCol); + setJoinColumnInfo(&pInfo->rightCol, rightTsCol); + } else { + ASSERT(false); + } +} SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { @@ -53,14 +80,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - SNode* pMergeCondition = pJoinNode->pMergeCondition; - if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { - SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; - setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); - setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); - } else { - ASSERT(false); - } + extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode); if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); @@ -173,16 +193,16 @@ static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotI } SSDataBlock* block = pBlock; - bool createdNewBlock = false; + bool createdNewBlock = false; if (endPos == numRows) { - block = blockDataExtractBlock(pBlock, startPos, endPos-startPos); + block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); taosArrayPush(createdBlocks, &block); createdNewBlock = true; } SRowLocation location = {0}; for (int32_t j = startPos; j < endPos; ++j) { location.pDataBlock = block; - location.pos = ( createdNewBlock ? j - startPos : j); + location.pos = (createdNewBlock ? j - startPos : j); taosArrayPush(rowLocations, &location); } return 0; @@ -367,17 +387,3 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { } return (pRes->info.rows > 0) ? pRes : NULL; } - -static void extractTimeCondition(SJoinOperatorInfo* pInfo, SLogicConditionNode* pLogicConditionNode) { - int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList); - - for (int32_t i = 0; i < len; ++i) { - SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i); - if (nodeType(pNode) == QUERY_NODE_OPERATOR) { - SOperatorNode* pn1 = (SOperatorNode*)pNode; - setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pn1->pLeft); - setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pn1->pRight); - break; - } - } -} diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 2468dca86c..d61cd871ee 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -220,7 +220,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { char buf[128] = {0}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; int32_t sz = idxSerialCacheKey(&key, buf); - indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType); + indexDebug("w suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType); IndexCache** cache = taosHashGet(index->colObj, buf, sz); assert(*cache != NULL); @@ -395,7 +395,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) char buf[128] = {0}; ICacheKey key = { .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType}; - indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType); + indexDebug("r suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType); int32_t sz = idxSerialCacheKey(&key, buf); taosThreadMutexLock(&sIdx->mtx); diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index b4e59d0f63..0a47fc0f16 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -141,7 +141,7 @@ void tfileCacheDestroy(TFileCache* tcache) { TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); while (reader) { TFileReader* p = *reader; - indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, + indexInfo("drop table cache suid:%" PRIu64 ", colName:%s, colType:%d", p->header.suid, p->header.colName, p->header.colType); tfileReaderUnRef(p); reader = taosHashIterate(tcache->tableCache, reader); @@ -185,20 +185,20 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) { reader->ctx = ctx; if (0 != tfileReaderVerify(reader)) { - indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); + indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName); tfileReaderDestroy(reader); return NULL; } // T_REF_INC(reader); if (0 != tfileReaderLoadHeader(reader)) { - indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, + indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName); tfileReaderDestroy(reader); return NULL; } if (0 != tfileReaderLoadFst(reader)) { - indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s, errno: %d", reader->header.suid, + indexError("failed to load index fst, suid:%" PRIu64 ", colName:%s, code:0x%x", reader->header.suid, reader->header.colName, errno); tfileReaderDestroy(reader); return NULL; @@ -874,7 +874,7 @@ static int tfileReaderLoadHeader(TFileReader* reader) { int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); if (nread == -1) { - indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno, + indexError("actual Read: %d, to read: %d, code:0x%x, filename: %s", (int)(nread), (int)sizeof(buf), errno, reader->ctx->file.buf); } else { indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index c02ef64707..e6f7c4ceb8 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1693,22 +1693,30 @@ static EDealRes eliminateProjOptCanUseNewChildTargetsImpl(SNode* pNode, void* pC CheckNewChildTargetsCxt* pCxt = pContext; SNode* pTarget = NULL; FOREACH(pTarget, pCxt->pNewChildTargets) { - if (!nodesEqualNode(pTarget, pNode)) { - pCxt->canUse = false; - return DEAL_RES_END; + if (nodesEqualNode(pTarget, pNode)) { + pCxt->canUse = true; + return DEAL_RES_CONTINUE; } } + pCxt->canUse = false; + return DEAL_RES_END; } return DEAL_RES_CONTINUE; } -static bool eliminateProjOptCanUseNewChildTargets(SLogicNode* pChild, SNodeList* pNewChildTargets) { - if (NULL == pChild->pConditions) { - return true; +static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild, SNodeList* pNewChildTargets) { + if (NULL != pChild->pConditions) { + CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false}; + nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); + if (!cxt.canUse) return false; } - CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = true}; - nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); - return cxt.canUse; + if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && NULL != ((SJoinLogicNode*)pChild)->pOnConditions) { + SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild; + CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false}; + nodesWalkExpr(pJoinLogicNode->pOnConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); + if (!cxt.canUse) return false; + } + return true; } static void alignProjectionWithTarget(SLogicNode* pNode) { @@ -1748,7 +1756,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* } } } - if (eliminateProjOptCanUseNewChildTargets(pChild, pNewChildTargets)) { + if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets)) { nodesDestroyList(pChild->pTargets); pChild->pTargets = pNewChildTargets; } else { @@ -1760,6 +1768,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* if (TSDB_CODE_SUCCESS == code) { NODES_CLEAR_LIST(pProjectNode->node.pChildren); nodesDestroyNode((SNode*)pProjectNode); + //if pChild is a project logic node, remove its projection which is not reference by its target. alignProjectionWithTarget(pChild); } pCxt->optimized = true; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4e3c9bf73d..1b802b9e5b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -94,7 +94,7 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) { return -1; } - sDebug("vgId:%d, rid:%" PRId64 " is added to rsetId:%" PRId64, pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId); + sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%" PRId64, pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId); return pSyncNode->rid; } @@ -142,7 +142,7 @@ void syncStop(int64_t rid) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosRemoveRef(tsNodeRefId, rid); - sDebug("vgId:%d, rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId); + sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId); } int32_t syncSetStandby(int64_t rid) { @@ -730,8 +730,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs for (int i = 0; i < arrSize; ++i) { do { char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d, batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), - pMsgPArr[i]->msgType, arrSize); + snprintf(eventLog, sizeof(eventLog), "propose type:%s, batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), arrSize); syncNodeEventLog(pSyncNode, eventLog); } while (0); @@ -791,7 +790,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { do { char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType); + snprintf(eventLog, sizeof(eventLog), "propose type:%s", TMSG_INFO(pMsg->msgType)); syncNodeEventLog(pSyncNode, eventLog); } while (0); @@ -799,7 +798,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) { ret = -1; terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; - sError("vgId:%d, sync propose not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType); + sError("vgId:%d, failed to sync propose since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); goto _END; } @@ -808,8 +807,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { if (!syncNodeCanChange(pSyncNode)) { ret = -1; terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY; - sError("vgId:%d, sync reconfig not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), - pMsg->msgType); + sError("vgId:%d, failed to sync reconfig since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); goto _END; } @@ -836,13 +834,12 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { rpcFreeCont(rpcMsg.pCont); syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); ret = 1; - sDebug("vgId:%d, optimized index:%" PRId64 " success, msgtype:%s,%d", pSyncNode->vgId, retIndex, - TMSG_INFO(pMsg->msgType), pMsg->msgType); + sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); } else { ret = -1; terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("vgId:%d, optimized index:%" PRId64 " error, msgtype:%s,%d", pSyncNode->vgId, retIndex, - TMSG_INFO(pMsg->msgType), pMsg->msgType); + sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, + TMSG_INFO(pMsg->msgType)); } } else { @@ -851,7 +848,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } else { ret = -1; terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId); + sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId); } } @@ -861,8 +858,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } else { ret = -1; terrno = TSDB_CODE_SYN_NOT_LEADER; - sError("vgId:%d, sync propose not leader, %s, msgtype:%s,%d", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), TMSG_INFO(pMsg->msgType), pMsg->msgType); + sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + TMSG_INFO(pMsg->msgType)); goto _END; } @@ -887,7 +884,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { } } - snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); + snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP); if (!taosCheckExistFile(pSyncNode->configPath)) { // create a new raft config file SRaftCfgMeta meta; @@ -910,8 +907,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); - snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path); - snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); + snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path, + TD_DIRSEP); + snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP); pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->msgcb = pSyncInfo->msgcb; @@ -2764,7 +2762,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ESyncState state = flag; char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex); + snprintf(eventLog, sizeof(eventLog), "commit wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex); syncNodeEventLog(ths, eventLog); // execute fsm @@ -2782,13 +2780,13 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, // user commit if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) { bool internalExecute = true; - if ((ths->replicaNum == 1) && ths->restoreFinish && (ths->vgId != 1)) { + if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) { internalExecute = false; } do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "index:%" PRId64 ", internalExecute:%d", i, internalExecute); + snprintf(logBuf, sizeof(logBuf), "commit index:%" PRId64 ", internal:%d", i, internalExecute); syncNodeEventLog(ths, logBuf); } while (0); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 36be371213..0e17c1db80 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -229,8 +229,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr do { char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index, + TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); syncNodeEventLog(pData->pSyncNode, eventLog); } while (0); @@ -468,8 +468,8 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { do { char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "write2 index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + snprintf(eventLog, sizeof(eventLog), "write2 index:%" PRId64 ", type:%s, origin type:%s", pEntry->index, + TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); syncNodeEventLog(pData->pSyncNode, eventLog); } while (0); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 501f1fb435..db4c1e0ba5 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -50,9 +50,8 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { SSyncNode *pSyncNode = pObj->data; char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "resp mgr add, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p", - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + snprintf(eventLog, sizeof(eventLog), "save response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p", + TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); syncNodeEventLog(pSyncNode, eventLog); taosThreadMutexUnlock(&(pObj->mutex)); @@ -77,9 +76,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { SSyncNode *pSyncNode = pObj->data; char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "resp mgr get, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p", - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + snprintf(eventLog, sizeof(eventLog), "get response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p", + TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); syncNodeEventLog(pSyncNode, eventLog); taosThreadMutexUnlock(&(pObj->mutex)); @@ -98,9 +96,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu SSyncNode *pSyncNode = pObj->data; char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "resp mgr get-and-del, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p", - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + snprintf(eventLog, sizeof(eventLog), "get-and-del response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p", + TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); syncNodeEventLog(pSyncNode, eventLog); taosHashRemove(pObj->pRespHash, &index, sizeof(index)); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 3275774cce..63c414386b 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -93,7 +93,7 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // // We specified a non-zero wait. Time must advance. // if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime) // { - // printf("nanoseconds: %d, rc: %d, errno: %d. before filetime: %d, %d; after filetime: %d, %d\n", + // printf("nanoseconds: %d, rc: %d, code:0x%x. before filetime: %d, %d; after filetime: %d, %d\n", // nanosecs, rc, errno, // (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime, // (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 82e9c1e14d..882a4b6d91 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -131,7 +131,7 @@ ./test.sh -f tsim/parser/insert_tb.sim # TD-17038 ./test.sh -f tsim/parser/interp.sim ./test.sh -f tsim/parser/join_manyblocks.sim -# TD-18018 ./test.sh -f tsim/parser/join_multitables.sim +./test.sh -f tsim/parser/join_multitables.sim ./test.sh -f tsim/parser/join_multivnode.sim ./test.sh -f tsim/parser/join.sim ./test.sh -f tsim/parser/last_cache.sim diff --git a/tests/script/tmp/r1.sim b/tests/script/tmp/r1.sim new file mode 100644 index 0000000000..3fc875ad23 --- /dev/null +++ b/tests/script/tmp/r1.sim @@ -0,0 +1,47 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sql connect + +print =============== step1: create dnodes +sql create dnode $hostname port 7200 + +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> rows: $rows +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 2 then + return -1 +endi +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi + +print =============== step2: create database +sql create database db vgroups 1 replica 1 +sql show databases +if $rows != 3 then + return -1 +endi + +sql use db; +sql create table stb (ts timestamp, c int) tags (t int); +sql create table t0 using stb tags (0); +sql insert into t0 values(now, 1); +sql insert into t0 values(now+1s, 1); + +return diff --git a/tests/script/tsim/parser/join_multitables.sim b/tests/script/tsim/parser/join_multitables.sim index 6c5138f8cb..4278be52f3 100644 --- a/tests/script/tsim/parser/join_multitables.sim +++ b/tests/script/tsim/parser/join_multitables.sim @@ -682,7 +682,7 @@ if $data08 != 3 then return -1 endi -sql select st0.f1,st1.f1 from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1; +sql select st0.f1,st1.f1 from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 order by st0.f1; if $rows != 25 then return -1 endi @@ -721,22 +721,10 @@ endi if $data01 != @21-03-01 01:00:00.000@ then return -1 endi -if $data10 != @21-03-02 01:00:00.000@ then +if $data50 != @21-03-02 01:00:00.000@ then return -1 endi -if $data11 != @21-03-02 01:00:00.000@ then - return -1 -endi -if $data20 != @21-03-03 01:00:00.000@ then - return -1 -endi -if $data21 != @21-03-03 01:00:00.000@ then - return -1 -endi -if $data30 != @21-03-04 01:00:00.000@ then - return -1 -endi -if $data31 != @21-03-04 01:00:00.000@ then +if $data51 != @21-03-02 01:00:00.000@ then return -1 endi @@ -782,23 +770,23 @@ endi if $data04 != 01 then return -1 endi -if $data10 != @21-03-02 01:00:00.000@ then +if $data50 != @21-03-02 01:00:00.000@ then return -1 endi -if $data11 != 9901.000000000 then +if $data51 != 9901.000000000 then return -1 endi -if $data12 != 11 then +if $data52 != 11 then return -1 endi -if $data13 != 9911.000000000 then +if $data53 != 9911.000000000 then return -1 endi -if $data14 != 01 then +if $data54 != 01 then return -1 endi -sql select last(*) from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 interval(10a); +sql select _wstart, last(*) from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 interval(10a); if $rows != 25 then return -1 endi @@ -830,7 +818,7 @@ if $data08 != 11 then return -1 endi -sql select last(*) from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 interval(1d) sliding(1d); +sql select _wstart, last(*) from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 interval(1d) sliding(1d); if $rows != 5 then return -1 endi @@ -937,7 +925,7 @@ sql select st0.*,st1.* from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts and if $rows != 5 then return -1 endi -if $data00 != @21-03-01 01:00:00.000@ then +if $data00 != @21-03-02 01:00:00.000@ then print $data00 return -1 endi @@ -965,7 +953,7 @@ endi if $data08 != 3 then return -1 endi -if $data09 != @21-03-01 01:00:00.000@ then +if $data09 != @21-03-02 01:00:00.000@ then return -1 endi @@ -973,38 +961,23 @@ sql select top(st1.f1, 5) from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts if $rows != 5 then return -1 endi -if $data00 != @21-03-01 05:00:00.000@ then - return -1 -endi -if $data01 != 9915 then +if $data00 != 9915 then return -1 endi -if $data10 != @21-03-02 05:00:00.000@ then - return -1 -endi -if $data11 != 9915 then +if $data10 != 9915 then return -1 endi -if $data20 != @21-03-03 05:00:00.000@ then - return -1 -endi -if $data21 != 9915 then +if $data20 != 9915 then return -1 endi -if $data30 != @21-03-04 05:00:00.000@ then - return -1 -endi -if $data31 != 9915 then +if $data30 != 9915 then return -1 endi -if $data40 != @21-03-05 05:00:00.000@ then - return -1 -endi -if $data41 != 9915 then +if $data40 != 9915 then return -1 endi -sql select top(st0.f1,5) from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts and st1.ts=st0.ts and st0.id1=st1.id1; +sql select st0.ts, top(st0.f1,5) from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts and st1.ts=st0.ts and st0.id1=st1.id1 order by st0.ts; if $rows != 5 then return -1 endi @@ -1329,25 +1302,25 @@ if $data09 != 9925 then endi sql_error select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.f1=tb1_1.f1; -sql_error select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.ts=tb1_1.ts and tb0_1.id1=tb1_1.id2; -sql_error select tb0_5.*, tb1_5.*,tb2_5.*,tb3_5.*,tb4_5.*,tb5_5.*, tb6_5.*,tb7_5.*,tb8_5.*,tb9_5.*,tba_5.* from tb0_5, tb1_5, tb2_5, tb3_5, tb4_5,tb5_5, tb6_5, tb7_5, tb8_5, tb9_5, tba_5 where tb9_5.ts=tb8_5.ts and tb8_5.ts=tb7_5.ts and tb7_5.ts=tb6_5.ts and tb6_5.ts=tb5_5.ts and tb5_5.ts=tb4_5.ts and tb4_5.ts=tb3_5.ts and tb3_5.ts=tb2_5.ts and tb2_5.ts=tb1_5.ts and tb1_5.ts=tb0_5.ts and tb0_5.ts=tba_5.ts; +sql select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.ts=tb1_1.ts and tb0_1.id1=tb1_1.id2; +sql select tb0_5.*, tb1_5.*,tb2_5.*,tb3_5.*,tb4_5.*,tb5_5.*, tb6_5.*,tb7_5.*,tb8_5.*,tb9_5.*,tba_5.* from tb0_5, tb1_5, tb2_5, tb3_5, tb4_5,tb5_5, tb6_5, tb7_5, tb8_5, tb9_5, tba_5 where tb9_5.ts=tb8_5.ts and tb8_5.ts=tb7_5.ts and tb7_5.ts=tb6_5.ts and tb6_5.ts=tb5_5.ts and tb5_5.ts=tb4_5.ts and tb4_5.ts=tb3_5.ts and tb3_5.ts=tb2_5.ts and tb2_5.ts=tb1_5.ts and tb1_5.ts=tb0_5.ts and tb0_5.ts=tba_5.ts; -sql_error select * from st0, st1 where st0.ts=st1.ts; +sql select * from st0, st1 where st0.ts=st1.ts; sql_error select * from st0, st1 where st0.id1=st1.id1; sql_error select * from st0, st1 where st0.f1=st1.f1 and st0.id1=st1.id1; -sql_error select * from st0, st1, st2, st3 where st0.id1=st1.id1 and st2.id1=st3.id1 and st0.ts=st1.ts and st1.ts=st2.ts and st2.ts=st3.ts; +sql select * from st0, st1, st2, st3 where st0.id1=st1.id1 and st2.id1=st3.id1 and st0.ts=st1.ts and st1.ts=st2.ts and st2.ts=st3.ts; sql_error select * from st0, st1, st2 where st0.id1=st1.id1; sql_error select * from st0, st1 where st0.id1=st1.id1 and st0.id2=st1.id3; sql_error select * from st0, st1 where st0.id1=st1.id1 or st0.ts=st1.ts; sql_error select * from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 or st0.id2=st1.id2; sql_error select * from st0, st1, st2 where st0.ts=st1.ts and st0.id1=st1.id1; sql_error select * from st0, st1 where st0.id1=st1.ts and st0.ts=st1.id1; -sql_error select * from st0, st1 where st0.id1=st1.id2 and st0.ts=st1.ts; -sql_error select * from st0, st1 where st1.id4=st0.id4 and st1.ts=st0.ts; -sql_error select * from st0, st1 where st0.id1=st1.id2 and st1.ts=st0.ts; +sql select * from st0, st1 where st0.id1=st1.id2 and st0.ts=st1.ts; +sql select * from st0, st1 where st1.id4=st0.id4 and st1.ts=st0.ts; +sql select * from st0, st1 where st0.id1=st1.id2 and st1.ts=st0.ts; sql_error select * from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 interval 10a; sql_error select last(*) from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 group by f1; sql_error select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9 where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1; -sql_error select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9,sta where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.ts=st1.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1 and st0.id1=st1.id1 and st0.id1=sta.id1 and st0.ts=sta.ts; +sql select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9,sta where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.ts=st1.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1 and st0.id1=st1.id1 and st0.id1=sta.id1 and st0.ts=sta.ts; system sh/exec.sh -n dnode1 -s stop -x SIGINT