From 0e8aaf4c9e6ee7a13941bf7bbcdd39e85aec33d4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 28 Sep 2022 10:38:49 +0800 Subject: [PATCH 01/21] feat(tmq): push optimization --- include/libs/executor/executor.h | 14 ++-- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/tq.h | 18 ++++- source/dnode/vnode/src/tq/tq.c | 96 +++++++++++++++++++++++-- source/dnode/vnode/src/tq/tqExec.c | 6 +- source/dnode/vnode/src/tq/tqPush.c | 74 +++++++++++++++++++ source/dnode/vnode/src/tq/tqRead.c | 63 ++++++++-------- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executor.c | 10 ++- source/libs/executor/src/scanoperator.c | 35 +++++++++ 11 files changed, 271 insertions(+), 52 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 8c1d957381..1049599a56 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,13 +29,13 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); +typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); typedef struct { - void *handle; + void* handle; bool localExec; localFetchFp fp; - SArray *explainRes; + SArray* explainRes; } SLocalFetch; typedef struct { @@ -51,9 +51,9 @@ typedef struct { bool initTqReader; int32_t numOfVgroups; - void* sContext; // SSnapContext* + void* sContext; // SSnapContext* - void* pStateBackend; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg @@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** @@ -195,6 +195,8 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); +int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq); + int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6ba10641f5..f0fb8d4b02 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -217,7 +217,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver); int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret); -int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver); +int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c3441a43f0..04b0813445 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -113,10 +113,20 @@ typedef struct { } STqHandle; +typedef struct { + SMqDataRsp dataRsp; + SMqRspHead rspHead; + STqHandle* pHandle; + SRpcHandleInfo pInfo; +} STqPushEntry; + struct STQ { - SVnode* pVnode; - char* path; - SHashObj* pPushMgr; // consumerId -> STqHandle* + SVnode* pVnode; + char* path; + + SRWLatch pushLock; + + SHashObj* pPushMgr; // consumerId -> STqPushEntry SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo @@ -146,7 +156,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); +int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 76f3c1b12d..21136405cb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -78,7 +78,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); - pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + taosInitRWLatch(&pTq->pushLock); + pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pTq->pPushMgr, taosMemoryFree); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -153,6 +155,65 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, return 0; } +int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { + SMqDataRsp* pRsp = &pPushEntry->dataRsp; + + ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); + + ASSERT(!pRsp->withSchema); + ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + + if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { + if (pRsp->blockNum > 0) { + ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + } else { + ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + } + } + + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); + + if (code < 0) { + return -1; + } + + int32_t tlen = sizeof(SMqRspHead) + len; + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + memcpy(buf, &pPushEntry->rspHead, sizeof(SMqRspHead)); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, abuf, len); + tEncodeSMqDataRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg rsp = { + .info = pPushEntry->pInfo, + .pCont = buf, + .contLen = tlen, + .code = 0, + }; + + tmsgSendRsp(&rsp); + + char buf1[80] = {0}; + char buf2[80] = {0}; + tFormatOffset(buf1, 80, &pRsp->reqOffset); + tFormatOffset(buf2, 80, &pRsp->rspOffset); + tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", + TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + + return 0; +} + int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); @@ -477,11 +538,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + // lock + taosWLockLatch(&pTq->pushLock); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); #if 1 - + if (dataRsp.blockNum == 0) { + STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); + if (pPushEntry != NULL) { + pPushEntry->pHandle = pHandle; + pPushEntry->pInfo = pMsg->info; + memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); + pPushEntry->rspHead.consumerId = consumerId; + pPushEntry->rspHead.epoch = reqEpoch; + pPushEntry->rspHead.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, + TD_VID(pTq->pVnode)); + // unlock + taosWUnLockLatch(&pTq->pushLock); + return 0; + } + } + taosWUnLockLatch(&pTq->pushLock); #endif + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } @@ -615,9 +696,14 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); - ASSERT(code == 0); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + } - tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); + code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such offset", pReq->subKey); + } if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { ASSERT(0); @@ -756,7 +842,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe atomic_add_fetch_32(&pHandle->epoch, 1); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { // TODO + ASSERT(0); } + // close handle } return 0; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index a24f920235..58d051bec1 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -15,7 +15,7 @@ #include "tq.h" -static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; @@ -243,7 +243,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp } if (pHandle->fetchMeta) { SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); + int32_t schemaLen = htonl(pBlk->schemaLen); if (schemaLen > 0) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); @@ -278,7 +278,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp } if (pHandle->fetchMeta) { SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); + int32_t schemaLen = htonl(pBlk->schemaLen); if (schemaLen > 0) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index a57e8174fe..c42cfeb7b8 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,6 +213,80 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { + if (msgType == TDMT_VND_SUBMIT) { + // lock push mgr to avoid potential msg lost + taosWLockLatch(&pTq->pushLock); + if (taosHashGetSize(pTq->pPushMgr) != 0) { + SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); + SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); + void* data = taosMemoryMalloc(msgLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to copy data for stream since out of memory"); + return -1; + } + memcpy(data, msg, msgLen); + SSubmitReq* pReq = (SSubmitReq*)data; + pReq->version = ver; + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pPushMgr, pIter); + if (pIter == NULL) break; + STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; + STqExecHandle* pExec = &pPushEntry->pHandle->execHandle; + qTaskInfo_t task = pExec->task; + SMqDataRsp* pRsp = &pPushEntry->dataRsp; + + // prepare scan mem data + qStreamScanMemData(task, pReq); + + // exec + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(task, NULL, &ts) < 0) { + ASSERT(0); + } + + if (pDataBlock == NULL) { + break; + } + + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + pRsp->blockNum++; + } + if (pRsp->blockNum > 0) { + // set offset + tqOffsetResetToLog(&pRsp->rspOffset, ver); + // remove from hash + size_t kLen; + void* key = taosHashGetKey(pPushEntry, &kLen); + void* keyCopy = taosMemoryMalloc(kLen); + memcpy(keyCopy, key, kLen); + + taosArrayPush(cachedKeys, &keyCopy); + taosArrayPush(cachedKeyLens, &kLen); + + if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { + ASSERT(0); + } + tqPushDataRsp(pTq, pPushEntry); + } + } + // delete entry + for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { + void* key = taosArrayGetP(cachedKeys, i); + size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); + taosHashRemove(pTq->pPushMgr, key, kLen); + } + taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); + taosArrayDestroy(cachedKeyLens); + } + // unlock + taosWUnLockLatch(&pTq->pushLock); + } + if (vnodeIsRoleLeader(pTq->pVnode)) { if (msgType == TDMT_VND_SUBMIT) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 8e5f328efa..d93a8fe030 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -15,21 +15,20 @@ #include "tq.h" - -bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ - if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){ +bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) { return true; } - int16_t msgType = pHead->msgType; - char* body = pHead->body; - int32_t bodyLen = pHead->bodyLen; + int16_t msgType = pHead->msgType; + char* body = pHead->body; + int32_t bodyLen = pHead->bodyLen; - int64_t tbSuid = pHandle->execHandle.execTb.suid; - int64_t realTbSuid = 0; - SDecoder coder; - void* data = POINTER_SHIFT(body, sizeof(SMsgHead)); - int32_t len = bodyLen - sizeof(SMsgHead); + int64_t tbSuid = pHandle->execHandle.execTb.suid; + int64_t realTbSuid = 0; + SDecoder coder; + void* data = POINTER_SHIFT(body, sizeof(SMsgHead)); + int32_t len = bodyLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) { @@ -43,38 +42,38 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ if (tDecodeSVDropStbReq(&coder, &req) < 0) { goto end; } - realTbSuid = req.suid; + realTbSuid = req.suid; } else if (msgType == TDMT_VND_CREATE_TABLE) { SVCreateTbBatchReq req = {0}; if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { goto end; } - int32_t needRebuild = 0; + int32_t needRebuild = 0; SVCreateTbReq* pCreateReq = NULL; for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; - if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ + if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) { needRebuild++; } } - if(needRebuild == 0){ + if (needRebuild == 0) { // do nothing - }else if(needRebuild == req.nReqs){ + } else if (needRebuild == req.nReqs) { realTbSuid = tbSuid; - }else{ + } else { realTbSuid = tbSuid; SVCreateTbBatchReq reqNew = {0}; reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; - if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ + if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) { reqNew.nReqs++; taosArrayPush(reqNew.pArray, pCreateReq); } } - int tlen; + int tlen; int32_t ret = 0; tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret); void* buf = taosMemoryMalloc(tlen); @@ -107,7 +106,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ } } } else if (msgType == TDMT_VND_ALTER_TABLE) { - SVAlterTbReq req = {0}; + SVAlterTbReq req = {0}; if (tDecodeSVAlterTbReq(&coder, &req) < 0) { goto end; @@ -129,32 +128,32 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ goto end; } - int32_t needRebuild = 0; + int32_t needRebuild = 0; SVDropTbReq* pDropReq = NULL; for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; - if(pDropReq->suid == tbSuid){ + if (pDropReq->suid == tbSuid) { needRebuild++; } } - if(needRebuild == 0){ + if (needRebuild == 0) { // do nothing - }else if(needRebuild == req.nReqs){ + } else if (needRebuild == req.nReqs) { realTbSuid = tbSuid; - }else{ + } else { realTbSuid = tbSuid; SVDropTbBatchReq reqNew = {0}; reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; - if(pDropReq->suid == tbSuid){ + if (pDropReq->suid == tbSuid) { reqNew.nReqs++; taosArrayPush(reqNew.pArray, pDropReq); } } - int tlen; + int tlen; int32_t ret = 0; tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret); void* buf = taosMemoryMalloc(tlen); @@ -177,11 +176,11 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ goto end; } realTbSuid = req.suid; - } else{ + } else { ASSERT(0); } - end: +end: tDecoderClear(&coder); return tbSuid == realTbSuid; } @@ -224,7 +223,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea code = -1; goto END; } - if(isValValidForTable(pHandle, pHead)){ + if (isValValidForTable(pHandle, pHead)) { *fetchOffset = offset; code = 0; goto END; @@ -241,7 +240,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea offset++; } } - END: +END: taosThreadMutexUnlock(&pHandle->pWalReader->mutex); return code; } @@ -348,7 +347,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { } } -int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) { +int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) { pReader->pMsg = pMsg; if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6e9eba306a..b62a4dfbdc 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -808,7 +808,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; SSubmitBlk *pBlock; - SSubmitRsp rsp = {0}; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; int32_t nRows; @@ -921,7 +920,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } if (taosArrayGetSize(newTbUids) > 0) { - vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); + vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), + (int32_t)taosArrayGetSize(newTbUids)); } tqUpdateTbUidList(pVnode->pTq, newTbUids, true); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dc98b05479..f47855c2d0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -145,6 +145,7 @@ typedef struct { SMqMetaRsp metaRsp; // for tmq fetching meta int8_t returned; int64_t snapshotVer; + const SSubmitReq* pReq; SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 373cb451f4..4db9338bd9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -486,7 +486,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL if (pLocal) { memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); } - + taosArrayClearEx(pResList, freeBlock); int64_t curOwner = 0; @@ -773,6 +773,14 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s return TSDB_CODE_SUCCESS; } +int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); + ASSERT(pTaskInfo->streamInfo.pReq == NULL); + pTaskInfo->streamInfo.pReq = pReq; + return 0; +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c5c33ae29..5f54d56ca7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1430,6 +1430,41 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SStreamScanInfo* pInfo = pOperator->info; qDebug("queue scan called"); + + if (pTaskInfo->streamInfo.pReq != NULL) { + if (pInfo->tqReader->pMsg == NULL) { + pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq; + const SSubmitReq* pSubmit = pInfo->tqReader->pMsg; + if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) { + qError("submit msg messed up when initing stream submit block %p", pSubmit); + pInfo->tqReader->pMsg = NULL; + ASSERT(0); + } + } + + blockDataCleanup(pInfo->pRes); + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + + while (tqNextDataBlock(pInfo->tqReader)) { + SSDataBlock block = {0}; + + int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); + + if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { + continue; + } + + setBlockIntoRes(pInfo, &block); + + if (pBlockInfo->rows > 0) { + return pInfo->pRes; + } + } + + pInfo->tqReader->pMsg = NULL; + pTaskInfo->streamInfo.pReq = NULL; + } + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { From 01c94a775b6779e501454cddef459cc958c12a62 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 28 Sep 2022 14:17:26 +0800 Subject: [PATCH 02/21] feat(tmq): push optimization --- source/client/src/clientTmq.c | 18 +++++++++--------- source/dnode/mnode/impl/src/mndConsumer.c | 1 + source/dnode/mnode/impl/src/mndTopic.c | 2 ++ source/dnode/vnode/src/tq/tq.c | 8 +++++++- source/dnode/vnode/src/tq/tqPush.c | 12 ++++++------ source/libs/executor/src/projectoperator.c | 10 +++++++--- source/libs/executor/src/scanoperator.c | 2 ++ 7 files changed, 34 insertions(+), 19 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c9c02a77e1..e0f48a4534 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -515,7 +515,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg; topic = pMetaRspObj->topic; vgId = pMetaRspObj->vgId; - } else if(TD_RES_TMQ_METADATA(msg)) { + } else if (TD_RES_TMQ_METADATA(msg)) { SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg; topic = pRspObj->topic; vgId = pRspObj->vgId; @@ -715,7 +715,7 @@ void tmqSendHbReq(void* param, void* tmrId) { int32_t epoch = tmq->epoch; SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq)); if (pReq == NULL) goto OVER; - pReq->consumerId = consumerId; + pReq->consumerId = htobe64(consumerId); pReq->epoch = epoch; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -1661,9 +1661,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // build rsp void* pRsp = NULL; - if(pollRspWrapper->taosxRsp.createTableNum == 0){ + if (pollRspWrapper->taosxRsp.createTableNum == 0) { pRsp = tmqBuildRspFromWrapper(pollRspWrapper); - }else{ + } else { pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); } taosFreeQitem(pollRspWrapper); @@ -1850,12 +1850,12 @@ const char* tmq_get_table_name(TAOS_RES* res) { return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; - if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || - pRspObj->resIter >= pRspObj->rsp.blockNum) { - return NULL; - } - return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= pRspObj->rsp.blockNum) { + return NULL; } + return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + } return NULL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index abc23e3d95..3dfc10e554 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -272,6 +272,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { + mError("consumer %ld not exist", consumerId); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; return -1; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index b9647a28fb..7308dc375e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -379,6 +379,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; + qDebugL("ast %s", topicObj.ast); + SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { taosMemoryFree(topicObj.ast); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 21136405cb..490a313cd4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -65,6 +65,11 @@ static void destroySTqHandle(void* data) { } } +static void tqPushEntryFree(void* data) { + void* p = *(void**)data; + taosMemoryFree(p); +} + STQ* tqOpen(const char* path, SVnode* pVnode) { STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); if (pTq == NULL) { @@ -80,7 +85,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosInitRWLatch(&pTq->pushLock); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - taosHashSetFreeFp(pTq->pPushMgr, taosMemoryFree); + taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -548,6 +553,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pPushEntry != NULL) { pPushEntry->pHandle = pHandle; pPushEntry->pInfo = pMsg->info; + dataRsp.withTbName = 0; memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); pPushEntry->rspHead.consumerId = consumerId; pPushEntry->rspHead.epoch = reqEpoch; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index c42cfeb7b8..13ed36e2bc 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -245,7 +245,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - if (qExecTask(task, NULL, &ts) < 0) { + if (qExecTask(task, &pDataBlock, &ts) < 0) { ASSERT(0); } @@ -256,21 +256,19 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); pRsp->blockNum++; } + if (pRsp->blockNum > 0) { // set offset tqOffsetResetToLog(&pRsp->rspOffset, ver); // remove from hash size_t kLen; - void* key = taosHashGetKey(pPushEntry, &kLen); + void* key = taosHashGetKey(pIter, &kLen); void* keyCopy = taosMemoryMalloc(kLen); memcpy(keyCopy, key, kLen); taosArrayPush(cachedKeys, &keyCopy); taosArrayPush(cachedKeyLens, &kLen); - if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { - ASSERT(0); - } tqPushDataRsp(pTq, pPushEntry); } } @@ -278,7 +276,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { void* key = taosArrayGetP(cachedKeys, i); size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); - taosHashRemove(pTq->pPushMgr, key, kLen); + if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { + ASSERT(0); + } } taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); taosArrayDestroy(cachedKeyLens); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 2f12a0d19b..b7c5b5634e 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -53,7 +53,7 @@ static void destroyIndefinitOperatorInfo(void* param) { SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -184,7 +184,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pBlock, keepRows); - //TODO: optimize it later when partition by + limit + // TODO: optimize it later when partition by + limit if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { doSetOperatorCompleted(pOperator); @@ -206,6 +206,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { blockDataCleanup(pFinalRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pTaskInfo->streamInfo.pReq) { + pOperator->status = OP_OPENED; + } + if (pOperator->status == OP_EXEC_DONE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pOperator->status = OP_OPENED; @@ -254,7 +258,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } setInfoForNewGroup(pBlock, pLimitInfo, pOperator); - if (pOperator->status == OP_EXEC_DONE) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && pOperator->status == OP_EXEC_DONE) { break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fa05608ced..46280295b9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1443,6 +1443,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) { qError("submit msg messed up when initing stream submit block %p", pSubmit); pInfo->tqReader->pMsg = NULL; + pTaskInfo->streamInfo.pReq = NULL; ASSERT(0); } } @@ -1468,6 +1469,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { pInfo->tqReader->pMsg = NULL; pTaskInfo->streamInfo.pReq = NULL; + return NULL; } if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { From cde4085a6d43ea2825233244fbb8d6c2a41962b6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 28 Sep 2022 20:24:02 +0800 Subject: [PATCH 03/21] optimize msg batch --- include/common/tcommon.h | 1 + source/client/src/clientTmq.c | 6 +++++- source/dnode/vnode/src/tq/tq.c | 3 +++ source/dnode/vnode/src/tq/tqPush.c | 4 ++++ source/dnode/vnode/src/tq/tqRead.c | 9 +++++++-- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/projectoperator.c | 14 +++++++++++++- source/libs/executor/src/scanoperator.c | 10 ++++++---- 8 files changed, 40 insertions(+), 8 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2544cedda7..2add3332ab 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -177,6 +177,7 @@ typedef struct SSDataBlock { enum { FETCH_TYPE__DATA = 1, FETCH_TYPE__META, + FETCH_TYPE__SEP, FETCH_TYPE__NONE, }; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e0f48a4534..66f992f05f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1603,6 +1603,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; + tscDebug("consumer %ld actual process poll rsp", tmq->consumerId); /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { @@ -1718,7 +1719,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { while (1) { tmqHandleAllDelayedTask(tmq); - if (tmqPollImpl(tmq, timeout) < 0) return NULL; + if (tmqPollImpl(tmq, timeout) < 0) { + tscDebug("return since poll err"); + /*return NULL;*/ + } rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 490a313cd4..070abadee2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -420,6 +420,8 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su return -1; } + pRsp->withTbName = 0; +#if 0 pRsp->withTbName = pReq->withTbName; if (pRsp->withTbName) { pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); @@ -428,6 +430,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su return -1; } } +#endif if (subType == TOPIC_SUB_TYPE__COLUMN) { pRsp->withSchema = false; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 13ed36e2bc..e7d6c1eb47 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,6 +213,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { + tqDebug("vgId:%d tq push msg ver %ld", pTq->pVnode->config.vgId, ver); + if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); @@ -257,6 +259,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) pRsp->blockNum++; } + tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, + pPushEntry->pHandle->subKey, pRsp->blockNum); if (pRsp->blockNum > 0) { // set offset tqOffsetResetToLog(&pRsp->rspOffset, ver); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index d93a8fe030..3bd31e6660 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -314,14 +314,18 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { return -1; } void* body = pReader->pWalReader->pHead->head.body; +#if 0 if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { // TODO do filter ret->fetchType = FETCH_TYPE__META; ret->meta = pReader->pWalReader->pHead->head.body; return 0; } else { - tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version); +#endif + tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version); +#if 0 } +#endif } while (tqNextDataBlock(pReader)) { @@ -333,6 +337,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { continue; } ret->fetchType = FETCH_TYPE__DATA; + tqDebug("return data rows %d", ret->data.info.rows); return 0; } @@ -340,7 +345,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; ASSERT(pReader->ver >= 0); - ret->fetchType = FETCH_TYPE__NONE; + ret->fetchType = FETCH_TYPE__SEP; tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version); return 0; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 27760d7969..56470f0668 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -193,6 +193,7 @@ enum { OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, OP_EXEC_DONE = 0x9, + OP_EXEC_RECV = 0x11, }; typedef struct SOperatorFpSet { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index b7c5b5634e..8d40824cc4 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -241,9 +241,20 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // The downstream exec may change the value of the newgroup, so use a local variable instead. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pOperator->status == OP_EXEC_RECV && + pFinalRes->info.rows == 0) { + pOperator->status = OP_OPENED; + continue; + } + qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, + pFinalRes->info.rows); doSetOperatorCompleted(pOperator); break; } + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { + qDebug("set status recv"); + pOperator->status = OP_EXEC_RECV; + } // for stream interval if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT || @@ -258,7 +269,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } setInfoForNewGroup(pBlock, pLimitInfo, pOperator); - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && pOperator->status == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { break; } @@ -302,6 +313,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { + qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status); break; } } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 46280295b9..927ef2c64d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1504,8 +1504,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (setBlockIntoRes(pInfo, &ret.data) < 0) { ASSERT(0); } - // TODO clean data block if (pInfo->pRes->info.rows > 0) { + pOperator->status = OP_EXEC_RECV; qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); return pInfo->pRes; } @@ -1514,18 +1514,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { // pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.metaBlk = ret.meta; // return NULL; - } else if (ret.fetchType == FETCH_TYPE__NONE) { + } else if (ret.fetchType == FETCH_TYPE__NONE || + (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { pTaskInfo->streamInfo.lastStatus = ret.offset; ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); char formatBuf[80]; tFormatOffset(formatBuf, 80, &ret.offset); qDebug("queue scan log return null, offset %s", formatBuf); + pOperator->status = OP_OPENED; return NULL; - } else { - ASSERT(0); } } +#if 0 } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { @@ -1534,6 +1535,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } qDebug("stream scan tsdb return null"); return NULL; +#endif } else { ASSERT(0); return NULL; From fab0adde99fda3dbeb3a7eed1bdac2b8fa32825d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 28 Sep 2022 21:44:37 +0800 Subject: [PATCH 04/21] fix memory error --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 11 +++++++++-- source/dnode/vnode/src/tq/tqPush.c | 20 ++++++++++++++------ 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 04b0813445..f96afe6fba 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -116,7 +116,7 @@ typedef struct { typedef struct { SMqDataRsp dataRsp; SMqRspHead rspHead; - STqHandle* pHandle; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; SRpcHandleInfo pInfo; } STqPushEntry; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 070abadee2..d0e7d2c766 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -554,8 +554,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (dataRsp.blockNum == 0) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); if (pPushEntry != NULL) { - pPushEntry->pHandle = pHandle; pPushEntry->pInfo = pMsg->info; + memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); dataRsp.withTbName = 0; memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); pPushEntry->rspHead.consumerId = consumerId; @@ -704,7 +704,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + taosWLockLatch(&pTq->pushLock); + int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); + if (code != 0) { + tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); + } + taosWUnLockLatch(&pTq->pushLock); + + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index e7d6c1eb47..4bd47f4e83 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,11 +213,12 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - tqDebug("vgId:%d tq push msg ver %ld", pTq->pVnode->config.vgId, ver); + tqDebug("vgId:%d tq push msg ver %ld, type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); + tqDebug("vgId:%d push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); if (taosHashGetSize(pTq->pPushMgr) != 0) { SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); @@ -235,10 +236,17 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) while (1) { pIter = taosHashIterate(pTq->pPushMgr, pIter); if (pIter == NULL) break; - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - STqExecHandle* pExec = &pPushEntry->pHandle->execHandle; + STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; + + STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); + if (pHandle == NULL) { + tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); + continue; + } + STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; - SMqDataRsp* pRsp = &pPushEntry->dataRsp; + + SMqDataRsp* pRsp = &pPushEntry->dataRsp; // prepare scan mem data qStreamScanMemData(task, pReq); @@ -259,8 +267,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) pRsp->blockNum++; } - tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, - pPushEntry->pHandle->subKey, pRsp->blockNum); + tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey, + pRsp->blockNum); if (pRsp->blockNum > 0) { // set offset tqOffsetResetToLog(&pRsp->rspOffset, ver); From b0a46942c2a61168aa8899c8ec62a5aaf67b18ad Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 28 Sep 2022 23:47:47 +0800 Subject: [PATCH 05/21] rsp to consumer if offset move forward --- source/dnode/vnode/src/tq/tq.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d0e7d2c766..b08d2d52e0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -551,7 +551,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); #if 1 - if (dataRsp.blockNum == 0) { + if (dataRsp.blockNum == 0 && dataRsp.rspOffset.type == TMQ_OFFSET__LOG && + dataRsp.reqOffset.version == dataRsp.rspOffset.version) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); if (pPushEntry != NULL) { pPushEntry->pInfo = pMsg->info; From a2a8fe02113e6c21f8848d99e529f2a769d85743 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 29 Sep 2022 01:06:42 +0800 Subject: [PATCH 06/21] rsp consumer when taosd stop --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b08d2d52e0..e7eab7ab73 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -66,7 +66,7 @@ static void destroySTqHandle(void* data) { } static void tqPushEntryFree(void* data) { - void* p = *(void**)data; + STqPushEntry* p = *(void**)data; taosMemoryFree(p); } From 5d0357fc4741c7665539a4a6d0e1a36e2db85013 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 29 Sep 2022 11:14:35 +0800 Subject: [PATCH 07/21] fix: very long string serialization problem --- source/libs/nodes/src/nodesMsgFuncs.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 8d1d332ac7..cdc4e66e42 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -90,12 +90,12 @@ static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) { static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) { int32_t tlvLen = sizeof(STlv) + len; if (pEncoder->offset + tlvLen > pEncoder->allocSize) { - void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize * 2); + pEncoder->allocSize = TMAX(pEncoder->allocSize * 2, pEncoder->allocSize + pEncoder->offset + tlvLen); + void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize); if (NULL == pNewBuf) { return TSDB_CODE_OUT_OF_MEMORY; } pEncoder->pBuf = pNewBuf; - pEncoder->allocSize = pEncoder->allocSize * 2; } STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset); pTlv->type = htons(type); From bb1165f1e95404b128e1f12692361aaa8107bd06 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 29 Sep 2022 12:14:41 +0800 Subject: [PATCH 08/21] optimize projection return --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/executor/src/projectoperator.c | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e7eab7ab73..ed5a894416 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -214,7 +214,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", - TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + TD_VID(pTq->pVnode), pPushEntry->rspHead.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); return 0; } @@ -551,7 +551,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); #if 1 - if (dataRsp.blockNum == 0 && dataRsp.rspOffset.type == TMQ_OFFSET__LOG && + if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); if (pPushEntry != NULL) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 8d40824cc4..e9e6fed66a 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -210,9 +210,12 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->status = OP_OPENED; } + qDebug("enter project"); + if (pOperator->status == OP_EXEC_DONE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pOperator->status = OP_OPENED; + qDebug("projection in queue model, set status open and return null"); return NULL; } @@ -241,10 +244,13 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // The downstream exec may change the value of the newgroup, so use a local variable instead. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pOperator->status == OP_EXEC_RECV && - pFinalRes->info.rows == 0) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) { pOperator->status = OP_OPENED; - continue; + if (pOperator->status == OP_EXEC_RECV) { + continue; + } else { + return NULL; + } } qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows); From 1d9ed1134f779dd6fa86265618625913eb61ea1e Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 29 Sep 2022 13:46:37 +0800 Subject: [PATCH 09/21] build: release ver-3.0.1.4 --- cmake/cmake.version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.version b/cmake/cmake.version index 2d86335b3c..05094f10cc 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.1.3") + SET(TD_VER_NUMBER "3.0.1.4") ENDIF () IF (DEFINED VERCOMPATIBLE) From 0c48581c86b12ed56e5597165d4dc5d1b2db75c2 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 29 Sep 2022 13:52:09 +0800 Subject: [PATCH 10/21] check push ver --- source/dnode/vnode/src/tq/tqPush.c | 5 +++++ tests/script/jenkins/basic.txt | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 4bd47f4e83..dcfb07f0ff 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -243,6 +243,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); continue; } + if (pPushEntry->dataRsp.reqOffset.version > ver) { + tqDebug("vgId:%d push entry req version %ld, while push version %ld, skip", pTq->pVnode->config.vgId, + pPushEntry->dataRsp.reqOffset.version, ver); + continue; + } STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 161c878440..82f73a4fdd 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -303,7 +303,7 @@ ./test.sh -f tsim/insert/backquote.sim -m # unsupport ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m ./test.sh -f tsim/query/interval-offset.sim -m -./test.sh -f tsim/tmq/basic3.sim -m +# unsupport ./test.sh -f tsim/tmq/basic3.sim -m ./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/qnode/basic1.sim -m # unsupport ./test.sh -f tsim/mnode/basic1.sim -m From b8c088b5f2156773e0e534014e1c73ed9e41aa3b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 29 Sep 2022 13:59:37 +0800 Subject: [PATCH 11/21] temporarily disable node case --- tests/docs-examples-test/node.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/docs-examples-test/node.sh b/tests/docs-examples-test/node.sh index 0283904815..41acf7c7b4 100644 --- a/tests/docs-examples-test/node.sh +++ b/tests/docs-examples-test/node.sh @@ -23,7 +23,7 @@ node query_example.js node async_query_example.js -node subscribe_demo.js +# node subscribe_demo.js taos -s "drop topic if exists topic_name_example" taos -s "drop database if exists power" @@ -39,4 +39,4 @@ taos -s "drop database if exists test" node opentsdb_telnet_example.js taos -s "drop database if exists test" -node opentsdb_json_example.js \ No newline at end of file +node opentsdb_json_example.js From ee484470f3946ab9768759e217acb959979cfb8d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:13:44 +0800 Subject: [PATCH 12/21] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 2c3808a703..fe13ba303c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -240,7 +240,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); if (!pVnode->restored) { - vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg); + vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg); terrno = TSDB_CODE_APP_NOT_READY; vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY); rpcFreeCont(pMsg->pCont); @@ -805,6 +805,12 @@ bool vnodeIsReadyForRead(SVnode *pVnode) { return true; } + if (pVnode->restored) { + vDebug("vgId:%d, vnode not restore finished", pVnode->config.vgId); + terrno = TSDB_CODE_APP_NOT_READY; + return false; + } + vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync)); return false; From 4223be379a98facb73063daf4296d576db3fea7d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:15:26 +0800 Subject: [PATCH 13/21] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index fe13ba303c..f334c47b1b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -806,7 +806,7 @@ bool vnodeIsReadyForRead(SVnode *pVnode) { } if (pVnode->restored) { - vDebug("vgId:%d, vnode not restore finished", pVnode->config.vgId); + vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); terrno = TSDB_CODE_APP_NOT_READY; return false; } From 7e85a15c31cd8b5ae7875cf8d6c26f41f86397c9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:18:20 +0800 Subject: [PATCH 14/21] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f334c47b1b..119fa02e05 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -805,7 +805,7 @@ bool vnodeIsReadyForRead(SVnode *pVnode) { return true; } - if (pVnode->restored) { + if (!pVnode->restored) { vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); terrno = TSDB_CODE_APP_NOT_READY; return false; From f14d00a5d68074f7c105e45b6263766445bf8316 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:19:43 +0800 Subject: [PATCH 15/21] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 119fa02e05..e15783d92b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -797,6 +797,12 @@ bool vnodeIsLeader(SVnode *pVnode) { } bool vnodeIsReadyForRead(SVnode *pVnode) { + if (!pVnode->restored) { + vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); + terrno = TSDB_CODE_APP_NOT_READY; + return false; + } + if (syncIsReady(pVnode->sync)) { return true; } @@ -805,12 +811,6 @@ bool vnodeIsReadyForRead(SVnode *pVnode) { return true; } - if (!pVnode->restored) { - vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); - terrno = TSDB_CODE_APP_NOT_READY; - return false; - } - vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync)); return false; From 4df330351490a5ef867eb73b03f8092e481ecdd1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:24:45 +0800 Subject: [PATCH 16/21] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 14 +++----------- source/libs/sync/src/syncMain.c | 5 ++++- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index e15783d92b..a5e2726bbf 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -797,18 +797,10 @@ bool vnodeIsLeader(SVnode *pVnode) { } bool vnodeIsReadyForRead(SVnode *pVnode) { - if (!pVnode->restored) { - vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); - terrno = TSDB_CODE_APP_NOT_READY; - return false; - } - - if (syncIsReady(pVnode->sync)) { - return true; - } - if (syncIsReadyForRead(pVnode->sync)) { - return true; + if (pVnode->restored) { + return true; + } } vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId, diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3d79c6c464..dc9bc6cbdc 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -405,7 +405,10 @@ bool syncIsReadyForRead(int64_t rid) { // TODO: last not noop? SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); - bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE); + bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; + if (!b) { + b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE); + } taosReleaseRef(tsNodeRefId, pSyncNode->rid); // if false, set error code From 586175ecfa6b573df63fe8808ff69cd53f8c013c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 29 Sep 2022 14:25:56 +0800 Subject: [PATCH 17/21] check vnode status for poll msg --- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b62a4dfbdc..1040000363 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -316,6 +316,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return 0; } + if (pMsg->msgType == TDMT_VND_CONSUME && !pVnode->restored) { + vnodeRedirectRpcMsg(pVnode, pMsg); + return 0; + } + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); From c56dd863998a4b07d93ac99e340aa46dbebd78aa Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:29:30 +0800 Subject: [PATCH 18/21] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/inc/vnd.h | 1 - source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSync.c | 11 ----------- source/libs/sync/src/syncMain.c | 26 -------------------------- 4 files changed, 2 insertions(+), 40 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 898e79928b..900d29b97e 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -99,7 +99,6 @@ void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); bool vnodeIsLeader(SVnode* pVnode); -bool vnodeIsReadyForRead(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6e9eba306a..a9bc3ef875 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -289,7 +289,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) { + if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } @@ -311,7 +311,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && - !vnodeIsReadyForRead(pVnode)) { + !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a5e2726bbf..980761cd14 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -796,14 +796,3 @@ bool vnodeIsLeader(SVnode *pVnode) { return true; } -bool vnodeIsReadyForRead(SVnode *pVnode) { - if (syncIsReadyForRead(pVnode->sync)) { - if (pVnode->restored) { - return true; - } - } - - vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId, - syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync)); - return false; -} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index dc9bc6cbdc..52feb625a8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -396,32 +396,6 @@ bool syncIsReady(int64_t rid) { return b; } -bool syncIsReadyForRead(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return false; - } - ASSERT(rid == pSyncNode->rid); - - // TODO: last not noop? - SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); - bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; - if (!b) { - b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE); - } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - - // if false, set error code - if (false == b) { - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - terrno = TSDB_CODE_SYN_NOT_LEADER; - } else { - terrno = TSDB_CODE_APP_NOT_READY; - } - } - return b; -} - bool syncIsRestoreFinish(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { From f1d492f62875468b4d22f004be0d569c419c004d Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Thu, 29 Sep 2022 16:18:47 +0800 Subject: [PATCH 19/21] doc: change rest connection sample as the default in tabs --- docs/en/14-reference/03-connector/04-java.mdx | 2 +- docs/en/14-reference/03-connector/05-go.mdx | 2 +- docs/en/14-reference/03-connector/06-rust.mdx | 22 ++++++++++--------- .../14-reference/03-connector/07-python.mdx | 2 +- docs/en/14-reference/03-connector/08-node.mdx | 4 ++-- docs/zh/08-connector/04-java.mdx | 2 +- docs/zh/08-connector/05-go.mdx | 2 +- docs/zh/08-connector/06-rust.mdx | 21 +++++++++--------- docs/zh/08-connector/07-python.mdx | 2 +- docs/zh/08-connector/08-node.mdx | 4 ++-- 10 files changed, 33 insertions(+), 30 deletions(-) diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index 0f977393f1..afa3200387 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -109,7 +109,7 @@ TDengine's JDBC URL specification format is: For establishing connections, native connections differ slightly from REST connections. - + ```java diff --git a/docs/en/14-reference/03-connector/05-go.mdx b/docs/en/14-reference/03-connector/05-go.mdx index 00e3bc1bc3..1bf99d72fa 100644 --- a/docs/en/14-reference/03-connector/05-go.mdx +++ b/docs/en/14-reference/03-connector/05-go.mdx @@ -113,7 +113,7 @@ username:password@protocol(address)/dbname?param=value ``` ### Connecting via connector - + _taosSql_ implements Go's `database/sql/driver` interface via cgo. You can use the [`database/sql`](https://golang.org/pkg/database/sql/) interface by simply introducing the driver. diff --git a/docs/en/14-reference/03-connector/06-rust.mdx b/docs/en/14-reference/03-connector/06-rust.mdx index 1184c98a28..9d2e840e9e 100644 --- a/docs/en/14-reference/03-connector/06-rust.mdx +++ b/docs/en/14-reference/03-connector/06-rust.mdx @@ -55,16 +55,6 @@ taos = "*" - - -In `cargo.toml`, add [taos][taos] and enable the native feature: - -```toml -[dependencies] -taos = { version = "*", default-features = false, features = ["native"] } -``` - - In `cargo.toml`, add [taos][taos] and enable the ws feature: @@ -75,6 +65,18 @@ taos = { version = "*", default-features = false, features = ["ws"] } ``` + + + +In `cargo.toml`, add [taos][taos] and enable the native feature: + +```toml +[dependencies] +taos = { version = "*", default-features = false, features = ["native"] } +``` + + + ## Establishing a connection diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index fc95033baa..355ba06ee4 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -81,7 +81,7 @@ pip3 install git+https://github.com/taosdata/taos-connector-python.git ### Verify - + For native connection, you need to verify that both the client driver and the Python connector itself are installed correctly. The client driver and Python connector have been installed properly if you can successfully import the `taos` module. In the Python Interactive Shell, you can type. diff --git a/docs/en/14-reference/03-connector/08-node.mdx b/docs/en/14-reference/03-connector/08-node.mdx index f93632b417..259ee590ef 100644 --- a/docs/en/14-reference/03-connector/08-node.mdx +++ b/docs/en/14-reference/03-connector/08-node.mdx @@ -85,7 +85,7 @@ If using ARM64 Node.js on Windows 10 ARM, you must add "Visual C++ compilers and ### Install via npm - + ```bash @@ -124,7 +124,7 @@ node nodejsChecker.js host=localhost Please choose to use one of the connectors. - + Install and import the `@tdengine/client` package. diff --git a/docs/zh/08-connector/04-java.mdx b/docs/zh/08-connector/04-java.mdx index 6b1715f8c6..d78da52aaa 100644 --- a/docs/zh/08-connector/04-java.mdx +++ b/docs/zh/08-connector/04-java.mdx @@ -109,7 +109,7 @@ TDengine 的 JDBC URL 规范格式为: 对于建立连接,原生连接与 REST 连接有细微不同。 - + ```java diff --git a/docs/zh/08-connector/05-go.mdx b/docs/zh/08-connector/05-go.mdx index 9d30f75190..515d1b030b 100644 --- a/docs/zh/08-connector/05-go.mdx +++ b/docs/zh/08-connector/05-go.mdx @@ -114,7 +114,7 @@ username:password@protocol(address)/dbname?param=value ``` ### 使用连接器进行连接 - + _taosSql_ 通过 cgo 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动就可以使用 [`database/sql`](https://golang.org/pkg/database/sql/) 的接口。 diff --git a/docs/zh/08-connector/06-rust.mdx b/docs/zh/08-connector/06-rust.mdx index 26f53c82d6..42cd5205cd 100644 --- a/docs/zh/08-connector/06-rust.mdx +++ b/docs/zh/08-connector/06-rust.mdx @@ -55,16 +55,6 @@ taos = "*" - - -在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性: - -```toml -[dependencies] -taos = { version = "*", default-features = false, features = ["native"] } -``` - - 在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。 @@ -74,6 +64,17 @@ taos = { version = "*", default-features = false, features = ["native"] } taos = { version = "*", default-features = false, features = ["ws"] } ``` + + + + +在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性: + +```toml +[dependencies] +taos = { version = "*", default-features = false, features = ["native"] } +``` + diff --git a/docs/zh/08-connector/07-python.mdx b/docs/zh/08-connector/07-python.mdx index 0242486d3b..2e80da4283 100644 --- a/docs/zh/08-connector/07-python.mdx +++ b/docs/zh/08-connector/07-python.mdx @@ -80,7 +80,7 @@ pip3 install git+https://github.com/taosdata/taos-connector-python.git ### 安装验证 - + 对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入: diff --git a/docs/zh/08-connector/08-node.mdx b/docs/zh/08-connector/08-node.mdx index 167ae069d6..1de19cdd3a 100644 --- a/docs/zh/08-connector/08-node.mdx +++ b/docs/zh/08-connector/08-node.mdx @@ -85,7 +85,7 @@ REST 连接器支持所有能运行 Node.js 的平台。 ### 使用 npm 安装 - + ```bash @@ -124,7 +124,7 @@ node nodejsChecker.js host=localhost 请选择使用一种连接器。 - + 安装并引用 `@tdengine/client` 包。 From 129f494e7ec03be1ada6a64ce3bade5688b0a017 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 29 Sep 2022 16:34:18 +0800 Subject: [PATCH 20/21] docs: add table name customization note (#17153) --- docs/en/20-third-party/03-telegraf.md | 20 +++++++++++++++----- docs/zh/20-third-party/03-telegraf.md | 20 +++++++++++++++----- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/docs/en/20-third-party/03-telegraf.md b/docs/en/20-third-party/03-telegraf.md index 6a7aac322f..e5c6d6f254 100644 --- a/docs/en/20-third-party/03-telegraf.md +++ b/docs/en/20-third-party/03-telegraf.md @@ -15,6 +15,7 @@ To write Telegraf data to TDengine requires the following preparations. - The TDengine cluster is deployed and functioning properly - taosAdapter is installed and running properly. Please refer to the [taosAdapter manual](/reference/taosadapter) for details. - Telegraf has been installed. Please refer to the [official documentation](https://docs.influxdata.com/telegraf/v1.22/install/) for Telegraf installation. +- Telegraf collects the running status measurements of current system. You can enable [input plugins](https://docs.influxdata.com/telegraf/v1.22/plugins/) to insert [other formats](https://docs.influxdata.com/telegraf/v1.24/data_formats/input/) data to Telegraf then forward to TDengine. ## Configuration steps @@ -31,11 +32,12 @@ Use TDengine CLI to verify Telegraf correctly writing data to TDengine and read ``` taos> show databases; - name | created_time | ntables | vgroups | replica | quorum | days | keep | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | cachelast | precision | update | status | -==================================================================================================================================================================================================================================================================================== - telegraf | 2022-04-20 08:47:53.488 | 22 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ns | 2 | ready | - log | 2022-04-20 07:19:50.260 | 9 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ms | 0 | ready | -Query OK, 2 row(s) in set (0.002401s) + name | +================================= + information_schema | + performance_schema | + telegraf | +Query OK, 3 rows in database (0.010568s) taos> use telegraf; Database changed. @@ -65,3 +67,11 @@ taos> select * from telegraf.system limit 10; | Query OK, 3 row(s) in set (0.013269s) ``` + +:::note + +- TDengine take influxdb format data and create unique ID for table names by the rule. +The user can configure `smlChildTableName` paramter to generate specified table names if he/she needs. And he/she also need to insert data with specified data format. +For example, Add `smlChildTableName=tname` in the taos.cfg file. Insert data `st,tname=cpu1,t1=4 c1=3 1626006833639000000` then the table name will be cpu1. If there are multiple lines has same tname but different tag_set, the first line's tag_set will be used to automatically creating table and ignore other lines. Please refer to [TDengine Schemaless](/reference/schemaless/#Schemaless-Line-Protocol) +::: + diff --git a/docs/zh/20-third-party/03-telegraf.md b/docs/zh/20-third-party/03-telegraf.md index 84883e665a..71bb1b3885 100644 --- a/docs/zh/20-third-party/03-telegraf.md +++ b/docs/zh/20-third-party/03-telegraf.md @@ -16,6 +16,7 @@ Telegraf 是一款十分流行的指标采集开源软件。在数据采集和 - TDengine 集群已经部署并正常运行 - taosAdapter 已经安装并正常运行。具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter) - Telegraf 已经安装。安装 Telegraf 请参考[官方文档](https://docs.influxdata.com/telegraf/v1.22/install/) +- Telegraf 默认采集系统运行状态数据。通过使能[输入插件](https://docs.influxdata.com/telegraf/v1.22/plugins/)方式可以输出[其他格式](https://docs.influxdata.com/telegraf/v1.24/data_formats/input/)的数据到 Telegraf 再写入到 TDengine中。 ## 配置步骤 @@ -32,11 +33,12 @@ sudo systemctl restart telegraf ``` taos> show databases; - name | created_time | ntables | vgroups | replica | quorum | days | keep | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | cachelast | precision | update | status | -==================================================================================================================================================================================================================================================================================== - telegraf | 2022-04-20 08:47:53.488 | 22 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ns | 2 | ready | - log | 2022-04-20 07:19:50.260 | 9 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ms | 0 | ready | -Query OK, 2 row(s) in set (0.002401s) + name | +================================= + information_schema | + performance_schema | + telegraf | +Query OK, 3 rows in database (0.010568s) taos> use telegraf; Database changed. @@ -66,3 +68,11 @@ taos> select * from telegraf.system limit 10; | Query OK, 3 row(s) in set (0.013269s) ``` + +:::note + +- TDengine 接收 influxdb 格式数据默认生成的子表名是根据规则生成的唯一 ID 值。 +用户如需指定生成的表名,可以通过在 taos.cfg 里配置 smlChildTableName 参数来指定。如果通过控制输入数据格式,即可利用 TDengine 这个功能指定生成的表名。 +举例如下:配置 smlChildTableName=tname 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的表名为 cpu1。如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。[TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议) +::: + From 075d8c4933b36145a17d2072dcc97c5dd6171555 Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Thu, 29 Sep 2022 16:59:47 +0800 Subject: [PATCH 21/21] doc: change default tab to rest --- docs/zh/08-connector/30-python.mdx | 10 +++++----- docs/zh/08-connector/40-csharp.mdx | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index 2e80da4283..c70677f816 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -80,7 +80,7 @@ pip3 install git+https://github.com/taosdata/taos-connector-python.git ### 安装验证 - + 对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入: @@ -118,7 +118,7 @@ Requirement already satisfied: taospy in c:\users\username\appdata\local\program 在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。 - + 请确保 TDengine 集群已经启动, 且集群中机器的 FQDN (如果启动的是单机版,FQDN 默认为 hostname)在本机能够解析, 可用 `ping` 命令进行测试: @@ -173,7 +173,7 @@ curl -u root:taosdata http://:/rest/sql -d "select server_version()" 以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。 - + ```python @@ -219,7 +219,7 @@ curl -u root:taosdata http://:/rest/sql -d "select server_version()" ### 基本使用 - + ##### TaosConnection 类的使用 @@ -289,7 +289,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 ### 与 pandas 一起使用 - + ```python diff --git a/docs/zh/08-connector/40-csharp.mdx b/docs/zh/08-connector/40-csharp.mdx index e99f41ae9c..70c0382080 100644 --- a/docs/zh/08-connector/40-csharp.mdx +++ b/docs/zh/08-connector/40-csharp.mdx @@ -35,7 +35,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx" ## 支持的功能特性 - + @@ -96,7 +96,7 @@ dotnet add exmaple.csproj reference src/TDengine.csproj ## 建立连接 - + @@ -171,7 +171,7 @@ namespace TDengineExample #### SQL 写入 - + @@ -203,7 +203,7 @@ namespace TDengineExample #### 参数绑定 - + @@ -227,7 +227,7 @@ namespace TDengineExample #### 同步查询 - +