From 1350af5267d57a9f4754dce505d90e99ce8bc401 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 16 Apr 2023 22:48:22 +0800 Subject: [PATCH 01/18] fix(stream): set the correct initial checkpoint version to restore the operators state and add check for the initial destination tables. --- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tq.c | 35 ++++++++++++++----------- source/dnode/vnode/src/tq/tqRestore.c | 7 ++--- source/dnode/vnode/src/tq/tqUtil.c | 6 +++++ source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/executor/inc/executil.h | 1 + source/libs/executor/inc/executorimpl.h | 1 - source/libs/executor/src/executil.c | 11 +++++++- source/libs/executor/src/executor.c | 26 +++++++++++++++--- source/libs/executor/src/scanoperator.c | 1 - source/util/src/tworker.c | 4 +-- 11 files changed, 66 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c007f84790..db17e4f533 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -179,6 +179,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqStreamTasksScanWal(STQ* pTq); // tq util +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1230a352d9..a641d44dba 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -20,6 +20,8 @@ // 2: wait to be inited or cleaup #define WAL_READ_TASKS_ID (-1) +static int32_t tqInitialize(STQ* pTq); + int32_t tqInit() { int8_t old; while (1) { @@ -109,25 +111,30 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); + tqInitialize(pVnode->pTq); + return pTq; +} + +int32_t tqInitialize(STQ* pTq) { if (tqMetaOpen(pTq) < 0) { - return NULL; + return -1; } pTq->pOffsetStore = tqOffsetOpen(pTq); if (pTq->pOffsetStore == NULL) { - return NULL; + return -1; } - pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId); + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId); if (pTq->pStreamMeta == NULL) { - return NULL; + return -1; } - if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) { - return NULL; + if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) { + return -1; } - return pTq; + return 0; } void tqClose(STQ* pTq) { @@ -547,13 +554,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return 0; } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { - // todo extract method - char buf[128] = {0}; - sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId); - +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t UNUSED_PARAM(ver)) { int32_t vgId = TD_VID(pTq->pVnode); - pTask->id.idStr = taosStrdup(buf); + pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(); @@ -567,7 +570,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; - pTask->chkInfo.version = ver; // expand executor if (pTask->fillHistory) { @@ -633,8 +635,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); + + // next valid version will add one + pTask->chkInfo.version += 1; return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 6ed74ddcc3..cba51cdee4 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -52,9 +52,6 @@ int tqStreamTasksScanWal(STQ* pTq) { double el = (taosGetTimestampMs() - st) / 1000.0; tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el); - - // restore wal scan flag -// atomic_store_8(&pTq->pStreamMeta->walScan, 0); return 0; } @@ -99,8 +96,8 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto continue; } - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || - pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); continue; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 4c37e1052f..00bff5da5d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -19,6 +19,12 @@ static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { + char buf[128] = {0}; + sprintf(buf, "0x%" PRIx64 "-%d", streamId, taskId); + return taosStrdup(buf); +} + // stream_task:stream_id:task_id void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { int32_t n = 12; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d4a394b584..dc2d709d76 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; - vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); + vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId); // start to restore all stream tasks tqStartStreamTasks(pVnode->pTq); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 9b8f034e44..c50fc86dfa 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -108,6 +108,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList); uint64_t tableListGetSuid(const STableListInfo* pTableList); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex); +void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, int32_t* type); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2cb6626b03..85424fd7de 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -427,7 +427,6 @@ typedef struct STimeWindowAggSupp { } STimeWindowAggSupp; typedef struct SStreamScanInfo { - uint64_t tableUid; // queried super table uid SExprInfo* pPseudoExpr; int32_t numOfPseudoExpr; SExprSupp tbnameCalSup; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 7d318786ba..33698522cd 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -36,6 +36,7 @@ struct STableListInfo { SArray* pTableList; SHashObj* map; // speedup acquire the tableQueryInfo by table uid uint64_t suid; + int32_t tableType; // queried table type }; typedef struct tagFilterAssist { @@ -1026,14 +1027,17 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, size_t numOfTables = 0; pListInfo->suid = pScanNode->suid; + pListInfo->tableType = pScanNode->tableType; + SArray* pUidList = taosArrayInit(8, sizeof(uint64_t)); SIdxFltStatus status = SFLT_NOT_INDEX; if (pScanNode->tableType != TSDB_SUPER_TABLE) { + pListInfo->suid = pScanNode->uid; + if (metaIsTableExist(metaHandle, pScanNode->uid)) { taosArrayPush(pUidList, &pScanNode->uid); } - code = doFilterByTagCond(pListInfo, pUidList, pTagCond, metaHandle, status); if (code != TSDB_CODE_SUCCESS) { goto _end; @@ -1819,6 +1823,11 @@ int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t st return -1; } +void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, int32_t* type) { + *psuid = pTableList->suid; + *type = pTableList->tableType; +} + uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); ASSERT(pTableList->map != NULL && slot != NULL); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6e3a7d8725..6b6ee931ba 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,6 +14,7 @@ */ #include "executor.h" +#include #include "executorimpl.h" #include "planner.h" #include "tdatablock.h" @@ -327,6 +328,13 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S return qa; } + STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; + + uint64_t suid = 0; + int32_t type = 0; + tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &type); + int32_t numOfExisted = tableListGetSize(pTableScanInfo->base.pTableListInfo); + // let's discard the tables those are not created according to the queried super table. SMetaReader mr = {0}; metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); @@ -341,9 +349,21 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S tDecoderClear(&mr.coder); - // TODO handle ntb case - if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) { + if (mr.me.type == TSDB_SUPER_TABLE) { continue; + } else { + if (type == TSDB_SUPER_TABLE) { + // this new created child table does not belong to the scanned super table. + if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) { + continue; + } + } else { // ordinary table + // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we + // should check all newly created ordinary table to make sure that this table isn't the destination table. + if (mr.me.uid != suid) { + continue; + } + } } if (pScanInfo->pTagCond != NULL) { @@ -382,7 +402,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI SStreamScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id - SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); + SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, id); int32_t numOfQualifiedTables = taosArrayGetSize(qa); qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ae396a4c68..52c04aecd8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2441,7 +2441,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } pInfo->readHandle = *pHandle; - pInfo->tableUid = pScanPhyNode->uid; pTaskInfo->streamInfo.snapshotVer = pHandle->version; pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup); blockDataEnsureCapacity(pInfo->pCreateTbRes, 8); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index a49ff0cd5b..6edee27c05 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -217,8 +217,8 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem int32_t queueNum = taosGetQueueNumber(pool->qset); int32_t curWorkerNum = taosArrayGetSize(pool->workers); - int32_t dstWorkerNum = ceil(queueNum * pool->ratio); - if (dstWorkerNum < 2) dstWorkerNum = 2; + int32_t dstWorkerNum = ceilf(queueNum * pool->ratio); + if (dstWorkerNum < 1) dstWorkerNum = 1; // spawn a thread to process queue while (curWorkerNum < dstWorkerNum) { From be90d2c511a3cd3756eea011cbc75f4b9fe3de6d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 16 Apr 2023 23:07:54 +0800 Subject: [PATCH 02/18] fix(stream): disable stream task when no tasks exist. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 11 ++++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 103f807191..9e0a2826c5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -346,7 +346,6 @@ typedef struct SStreamMeta { int32_t vgId; SRWLatch lock; int8_t walScan; - bool quit; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a641d44dba..190a0893a8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -111,7 +111,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); - tqInitialize(pVnode->pTq); + tqInitialize(pTq); return pTq; } @@ -1281,6 +1281,13 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; taosWLockLatch(&pMeta->lock); + int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); + if (numOfTasks == 0) { + tqInfo("vgId:%d no stream tasks exists", vgId); + taosWUnLockLatch(&pTq->pStreamMeta->lock); + return 0; + } + pMeta->walScan += 1; if (pMeta->walScan > 1) { @@ -1297,8 +1304,6 @@ int32_t tqStartStreamTasks(STQ* pTq) { return -1; } - int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); - tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); initOffsetForAllRestoreTasks(pTq); From 10b3fd9426a945159c95f56aff165f35b2595069 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Apr 2023 09:19:48 +0800 Subject: [PATCH 03/18] other: merge main. --- source/libs/stream/src/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7171b52912..3b9306c2cf 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,7 +16,7 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 2000 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 100000 int32_t streamInit() { int8_t old; From ac137b4b33502fba06715e5b0827b219a3a4c9ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Apr 2023 09:46:58 +0800 Subject: [PATCH 04/18] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 27 ++++++++++++--------------- source/libs/stream/src/stream.c | 4 ++++ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9e0a2826c5..aade34e965 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -335,17 +335,17 @@ struct SStreamTask { // meta typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pCheckpointDb; - SHashObj* pTasks; - void* ahandle; - TXN* txn; - FTaskExpand* expandFunc; - int32_t vgId; - SRWLatch lock; - int8_t walScan; + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pCheckpointDb; + SHashObj* pTasks; + void* ahandle; + TXN* txn; + FTaskExpand* expandFunc; + int32_t vgId; + SRWLatch lock; + int8_t walScan; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -358,10 +358,6 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); bool tInputQueueIsFull(const SStreamTask* pTask); -static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); -} - typedef struct { SMsgHead head; int64_t streamId; @@ -537,6 +533,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); // int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); +void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 3b9306c2cf..0f000f1f50 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -352,4 +352,8 @@ void* streamQueueNextItem(SStreamQueue* queue) { } return streamQueueCurItem(queue); } +} + +void streamTaskInputFail(SStreamTask* pTask) { + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file From eb7f510ccbfcbe3e92ca90c70a2c91e004642c4a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Apr 2023 10:59:24 +0800 Subject: [PATCH 05/18] fix(query): return correct suid to delete sink. --- include/libs/executor/dataSinkMgt.h | 1 - source/libs/executor/src/executil.c | 18 +++++++++++++----- source/libs/executor/src/executor.c | 9 ++++----- source/libs/executor/src/scanoperator.c | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index ed7cbc8125..ce7d038d42 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -29,7 +29,6 @@ extern "C" { #define DS_BUF_FULL 2 #define DS_BUF_EMPTY 3 -struct SDataSink; struct SSDataBlock; typedef struct SDeleterRes { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 33698522cd..af68cd5990 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -35,8 +35,11 @@ struct STableListInfo { int32_t* groupOffset; // keep the offset value for each group in the tableList SArray* pTableList; SHashObj* map; // speedup acquire the tableQueryInfo by table uid - uint64_t suid; - int32_t tableType; // queried table type + union { + uint64_t suid; + uint64_t uid; + }; // this maybe the super table or ordinary table + int32_t tableType; // queried table type }; typedef struct tagFilterAssist { @@ -1033,8 +1036,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SIdxFltStatus status = SFLT_NOT_INDEX; if (pScanNode->tableType != TSDB_SUPER_TABLE) { - pListInfo->suid = pScanNode->uid; - + pListInfo->uid = pScanNode->uid; if (metaIsTableExist(metaHandle, pScanNode->uid)) { taosArrayPush(pUidList, &pScanNode->uid); } @@ -1798,7 +1800,13 @@ uint64_t tableListGetSize(const STableListInfo* pTableList) { return taosArrayGetSize(pTableList->pTableList); } -uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->suid; } +uint64_t tableListGetSuid(const STableListInfo* pTableList) { + if (pTableList->tableType == TSDB_SUPER_TABLE) { + return pTableList->suid; + } else { // query normal table, no suid exists. + return 0; + } +} STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) { if (taosArrayGetSize(pTableList->pTableList) == 0) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6b6ee931ba..33697a4033 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -330,10 +330,9 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; - uint64_t suid = 0; + uint64_t uid = 0; int32_t type = 0; - tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &type); - int32_t numOfExisted = tableListGetSize(pTableScanInfo->base.pTableListInfo); + tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &uid, &type); // let's discard the tables those are not created according to the queried super table. SMetaReader mr = {0}; @@ -354,13 +353,13 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S } else { if (type == TSDB_SUPER_TABLE) { // this new created child table does not belong to the scanned super table. - if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) { + if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != uid) { continue; } } else { // ordinary table // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we // should check all newly created ordinary table to make sure that this table isn't the destination table. - if (mr.me.uid != suid) { + if (mr.me.uid != uid) { continue; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 52c04aecd8..2389c7252e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -690,7 +690,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } uint32_t status = 0; - int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); + code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } From 4ed26bbc1998a4b62a0f3316defe9a82703b9afc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Apr 2023 14:08:54 +0800 Subject: [PATCH 06/18] fix(stream): update the table list api. --- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 51 ++++++++++++++--------------- source/libs/executor/src/executor.c | 5 +-- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index c50fc86dfa..2e92f9e396 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -108,7 +108,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList); uint64_t tableListGetSuid(const STableListInfo* pTableList); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex); -void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, int32_t* type); +void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index af68cd5990..f61fd1ae01 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -27,19 +27,21 @@ #include "executorimpl.h" #include "tcompression.h" +typedef struct STableListIdInfo { + uint64_t suid; + uint64_t uid; + int32_t tableType; +} STableListIdInfo; + // If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly // The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups struct STableListInfo { - bool oneTableForEachGroup; - int32_t numOfOuputGroups; // the data block will be generated one by one - int32_t* groupOffset; // keep the offset value for each group in the tableList - SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid - union { - uint64_t suid; - uint64_t uid; - }; // this maybe the super table or ordinary table - int32_t tableType; // queried table type + bool oneTableForEachGroup; + int32_t numOfOuputGroups; // the data block will be generated one by one + int32_t* groupOffset; // keep the offset value for each group in the tableList + SArray* pTableList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid + STableListIdInfo idInfo; // this maybe the super table or ordinary table }; typedef struct tagFilterAssist { @@ -474,7 +476,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } // int64_t stt = taosGetTimestampUs(); - code = metaGetTableTags(metaHandle, pTableListInfo->suid, pUidTagList); + code = metaGetTableTags(metaHandle, pTableListInfo->idInfo.suid, pUidTagList); if (code != TSDB_CODE_SUCCESS) { goto end; } @@ -957,7 +959,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN FilterCondType condType = checkTagCond(pTagCond); - int32_t filter = optimizeTbnameInCond(metaHandle, pListInfo->suid, pUidTagList, pTagCond); + int32_t filter = optimizeTbnameInCond(metaHandle, pListInfo->idInfo.suid, pUidTagList, pTagCond); if (filter == 0) { // tbname in filter is activated, do nothing and return taosArrayClear(pUidList); @@ -970,12 +972,12 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN terrno = 0; } else { if ((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) { - code = metaGetTableTagsByUids(metaHandle, pListInfo->suid, pUidTagList); + code = metaGetTableTagsByUids(metaHandle, pListInfo->idInfo.suid, pUidTagList); } else { - code = metaGetTableTags(metaHandle, pListInfo->suid, pUidTagList); + code = metaGetTableTags(metaHandle, pListInfo->idInfo.suid, pUidTagList); } if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->suid); + qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid); terrno = code; goto end; } @@ -1029,14 +1031,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, int32_t code = TSDB_CODE_SUCCESS; size_t numOfTables = 0; - pListInfo->suid = pScanNode->suid; - pListInfo->tableType = pScanNode->tableType; + pListInfo->idInfo.suid = pScanNode->suid; + pListInfo->idInfo.tableType = pScanNode->tableType; SArray* pUidList = taosArrayInit(8, sizeof(uint64_t)); SIdxFltStatus status = SFLT_NOT_INDEX; if (pScanNode->tableType != TSDB_SUPER_TABLE) { - pListInfo->uid = pScanNode->uid; + pListInfo->idInfo.uid = pScanNode->uid; if (metaIsTableExist(metaHandle, pScanNode->uid)) { taosArrayPush(pUidList, &pScanNode->uid); } @@ -1801,11 +1803,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList) { } uint64_t tableListGetSuid(const STableListInfo* pTableList) { - if (pTableList->tableType == TSDB_SUPER_TABLE) { - return pTableList->suid; - } else { // query normal table, no suid exists. - return 0; - } + return pTableList->idInfo.suid; } STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) { @@ -1831,9 +1829,10 @@ int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t st return -1; } -void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, int32_t* type) { - *psuid = pTableList->suid; - *type = pTableList->tableType; +void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) { + *psuid = pTableList->idInfo.suid; + *uid = pTableList->idInfo.uid; + *type = pTableList->idInfo.tableType; } uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 33697a4033..181ab3d44c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -330,9 +330,10 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; + uint64_t suid = 0; uint64_t uid = 0; int32_t type = 0; - tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &uid, &type); + tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type); // let's discard the tables those are not created according to the queried super table. SMetaReader mr = {0}; @@ -353,7 +354,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S } else { if (type == TSDB_SUPER_TABLE) { // this new created child table does not belong to the scanned super table. - if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != uid) { + if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) { continue; } } else { // ordinary table From d21628b2c9f8854b97ebc284ab7a9cd18b70e800 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 17 Apr 2023 16:57:29 +0800 Subject: [PATCH 07/18] fix: fix double free caused crash --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 89686c3d33..ccb4cce199 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4236,7 +4236,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { static void freeSchemaFunc(void* param) { void* p = *(void**)param; - taosMemoryFree(p); + taosMemoryFreeClear(p); } // ====================================== EXPOSED APIs ====================================== From 930b267a75fe975a88b75af01f8f676176eecd38 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Apr 2023 22:59:00 +0800 Subject: [PATCH 08/18] fix(stream): set the correct start offset for stream task. --- source/libs/stream/src/streamDispatch.c | 12 +++++------- source/libs/stream/src/streamExec.c | 5 ++++- source/libs/stream/src/streamMeta.c | 3 +-- source/libs/stream/src/streamRecover.c | 8 ++++++-- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a9f6d29bf5..549374ed94 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -230,23 +230,21 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) { - goto FAIL; + rpcFreeCont(buf); + return code; } + tEncoderClear(&encoder); msg.contLen = tlen + sizeof(SMsgHead); msg.pCont = buf; msg.msgType = TDMT_STREAM_TASK_CHECK; - qDebug("dispatch from s-task:%s to downstream s-task:%"PRIx64":%d node %d: check msg", pTask->id.idStr, - pReq->streamId, pReq->downstreamTaskId, nodeId); + qDebug("dispatch from s-task:%s to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr, + pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); - return 0; -FAIL: - if (buf) rpcFreeCont(buf); - return code; } int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3d896c08ac..9a6ff302ef 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -189,7 +189,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt); streamDispatch(pTask); } - if (finished) break; + + if (finished) { + break; + } } return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 51cc315780..860514bb44 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -296,8 +296,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); - // todo set correct initial version. - if (pMeta->expandFunc(pMeta->ahandle, pTask, 0) < 0) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 03afc0692d..55c745e417 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -102,8 +102,10 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp .downstreamNodeId = pRsp->downstreamNodeId, .childId = pRsp->childId, }; - qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->id.taskId, pTask->nodeId, + + qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -116,6 +118,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp } } } + return 0; } @@ -158,9 +161,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* } else { ASSERT(0); } - } else { + } else { // not ready, it should wait for at least 100ms and then retry streamRecheckOneDownstream(pTask, pRsp); } + return 0; } From 2ae05b8cf6e962a6a709678e54781ba8429666b4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Apr 2023 08:56:31 +0800 Subject: [PATCH 09/18] fix(stream): set the correct initial offset value. --- source/dnode/vnode/src/tq/tq.c | 7 +++++-- tests/system-test/1-insert/delete_childtable.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 190a0893a8..7c7f59b6b7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -130,6 +130,8 @@ int32_t tqInitialize(STQ* pTq) { return -1; } + // the version is kept in task's meta data + // todo check if this version is required or not if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) { return -1; } @@ -554,7 +556,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return 0; } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t UNUSED_PARAM(ver)) { +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; @@ -570,6 +572,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t UNUSED_PARAM(ver)) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; + pTask->chkInfo.version = ver; // expand executor if (pTask->fillHistory) { @@ -755,7 +758,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tDecoderClear(&decoder); - // 2.save task + // 2.save task, use the newest commit version as the initial start version of stream task. code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, diff --git a/tests/system-test/1-insert/delete_childtable.py b/tests/system-test/1-insert/delete_childtable.py index e3144edb45..a12f884981 100644 --- a/tests/system-test/1-insert/delete_childtable.py +++ b/tests/system-test/1-insert/delete_childtable.py @@ -27,7 +27,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) self.dbname = 'db_test' self.setsql = TDSetSql() self.stbname = 'stb' From 7d3e5aa3d389cf4ac165f0eacd6fb8115e58c679 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Apr 2023 09:03:30 +0800 Subject: [PATCH 10/18] fix(query): set the table schema correctly when the table is dropped. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 50 ++++++++++++++++---------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 89686c3d33..15a605151c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1881,8 +1881,8 @@ static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_ return pReader->pSchema; } - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1); - if (pReader->pSchema == NULL) { + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, uid, -1, 1, &pReader->pSchema); + if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) { tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr); } @@ -1890,9 +1890,15 @@ static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_ } static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { + int32_t code = 0; + // always set the newest schema version in pReader->pSchema if (pReader->pSchema == NULL) { - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1); + code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, uid, -1, 1, &pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } } if (pReader->pSchema && sversion == pReader->pSchema->version) { @@ -1905,7 +1911,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* } STSchema* ptr = NULL; - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); + code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -2014,6 +2020,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == k.ts) { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } + int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2222,6 +2232,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (pSchema == NULL) { return code; } + STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); if (piSchema == NULL) { return code; @@ -3843,11 +3854,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return terrno; } - if (pReader->pSchema == NULL) { - pReader->pSchema = pTSchema; - } - - code = tsdbRowMergerInit(&merge, pReader->pSchema, ¤t, pTSchema); + STSchema* ps = (pReader->pSchema != NULL)? pReader->pSchema:pTSchema; + code = tsdbRowMergerInit(&merge, ps, ¤t, pTSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3891,7 +3899,14 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } + STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); + if (piSchema == NULL) { + return terrno; + } if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema); @@ -4000,10 +4015,11 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT int64_t uid = pScanInfo->uid; int32_t code = TSDB_CODE_SUCCESS; - int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); - SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid); + if (pSchema == NULL) { + return terrno; + } SColVal colVal = {0}; int32_t i = 0, j = 0; @@ -5187,8 +5203,6 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { } int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { - int32_t sversion = 1; - SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); int32_t code = metaGetTableEntryByUidCache(&mr, uid); @@ -5200,6 +5214,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 *suid = 0; + // only child table and ordinary table is allowed, super table is not allowed. if (mr.me.type == TSDB_CHILD_TABLE) { tDecoderClear(&mr.coder); *suid = mr.me.ctbEntry.suid; @@ -5209,9 +5224,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 metaReaderClear(&mr); return terrno; } - sversion = mr.me.stbEntry.schemaRow.version; - } else if (mr.me.type == TSDB_NORMAL_TABLE) { - sversion = mr.me.ntbEntry.schemaRow.version; + } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing } else { terrno = TSDB_CODE_INVALID_PARA; metaReaderClear(&mr); @@ -5219,9 +5232,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 } metaReaderClear(&mr); - *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1); - return TSDB_CODE_SUCCESS; + // get the newest table schema version + code = metaGetTbTSchemaEx(pVnode->pMeta, uid, -1, 1, pSchema); + return code; } int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { From 97da34050b9300d1f19242ade11bc8a5dc7dbbaf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Apr 2023 10:40:53 +0800 Subject: [PATCH 11/18] fix(query): fix invalid free. --- source/dnode/vnode/src/tq/tqUtil.c | 17 +++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbRead.c | 8 ++++---- source/libs/executor/src/dataDeleter.c | 1 + source/libs/executor/src/dataInserter.c | 1 + source/libs/executor/src/executor.c | 4 +--- 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 00bff5da5d..5ac747947f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -150,6 +150,21 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { + if (pRsp->blockData != NULL) { + pRsp->blockData = taosArrayDestroy(pRsp->blockData); + } + + if (pRsp->blockDataLen != NULL) { + pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); + } + + if (pRsp->blockTbName != NULL) { + pRsp->blockTbName = taosArrayDestroy(pRsp->blockTbName); + } + + if (pRsp->blockSchema != NULL) { + pRsp->blockSchema = taosArrayDestroy(pRsp->blockSchema); + } return -1; } @@ -283,6 +298,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { + tDeleteSTaosxRsp(&taosxRsp); return -1; } @@ -358,6 +374,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tDeleteSTaosxRsp(&taosxRsp); return code; } + code = 0; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 15a605151c..362dc51ad5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1881,7 +1881,7 @@ static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_ return pReader->pSchema; } - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, uid, -1, 1, &pReader->pSchema); + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema); if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) { tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr); } @@ -1894,7 +1894,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* // always set the newest schema version in pReader->pSchema if (pReader->pSchema == NULL) { - code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, uid, -1, 1, &pReader->pSchema); + code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -1975,7 +1975,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* // DESC: mem -----> imem -----> last block -----> file block if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { - init = true; + init = true; // todo check if pReader->pSchema is null or not int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -5234,7 +5234,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 metaReaderClear(&mr); // get the newest table schema version - code = metaGetTbTSchemaEx(pVnode->pMeta, uid, -1, 1, pSchema); + code = metaGetTbTSchemaEx(pVnode->pMeta, *suid, uid, -1, pSchema); return code; } diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index dc3bf83a91..d693faf7f1 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -240,6 +240,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); if (NULL == deleter) { + taosMemoryFree(pParam); code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index e9c46843c0..90d740bebd 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -408,6 +408,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat void* pParam) { SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle)); if (NULL == inserter) { + taosMemoryFree(pParam); terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 181ab3d44c..1732ec04a7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -517,10 +517,8 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, goto _error; } + // pSinkParam has been freed during create sinker. code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pSinkParam); - } } qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); From 6a9bdb98241d198c52e55b2f447ab4db46cc8c4e Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 18 Apr 2023 13:44:26 +0800 Subject: [PATCH 12/18] fix: duplicate calling shellWriteHistory --- tools/shell/src/shellEngine.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 910b067d4e..5ac32eaad9 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -58,7 +58,6 @@ int32_t shellRunSingleCommand(char *command) { } if (shellRegexMatch(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) { - shellWriteHistory(); return -1; } @@ -887,7 +886,6 @@ void shellWriteHistory() { } i = (i + 1) % SHELL_MAX_HISTORY_SIZE; } - taosFsyncFile(pFile); taosCloseFile(&pFile); } From 4417c79cb13e76e7a09a7a0252d917ec5af7eaa6 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 18 Apr 2023 14:03:24 +0800 Subject: [PATCH 13/18] fix: atoi on int64 config item (#20956) --- source/util/src/tconfig.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index f811d2f203..288ea6052b 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -187,7 +187,7 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st } static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - int64_t ival = (int64_t)atoi(value); + int64_t ival = (int64_t)atoll(value); if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax); From e58bf86c33958e54cb67966c1603cc6779232190 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 18 Apr 2023 16:34:06 +0800 Subject: [PATCH 14/18] fix local variable --- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ccb4cce199..5e2b1b33b9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4235,8 +4235,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } static void freeSchemaFunc(void* param) { - void* p = *(void**)param; - taosMemoryFreeClear(p); + void **p = (void **)param; + taosMemoryFreeClear(*p); } // ====================================== EXPOSED APIs ====================================== From 0822e82d6402552d74bc45a1dbcb717c4983fdf0 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 18 Apr 2023 17:12:04 +0800 Subject: [PATCH 15/18] feat: support exit by kill heart-beat thread mode --- include/client/taos.h | 3 +++ source/client/inc/clientInt.h | 1 + source/client/src/clientHb.c | 12 +++++++++++- tools/shell/src/shellMain.c | 3 +++ 4 files changed, 18 insertions(+), 1 deletion(-) diff --git a/include/client/taos.h b/include/client/taos.h index cf410a42da..a59e203644 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -225,6 +225,9 @@ DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *tabl DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); +// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner +DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill); + /* --------------------------schemaless INTERFACE------------------------------- */ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 41f87379a9..46d44d7443 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -80,6 +80,7 @@ typedef struct { int64_t appId; // ctl int8_t threadStop; + int8_t quitByKill; TdThread thread; TdThreadMutex lock; // used when app init and cleanup SHashObj* appSummary; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index c9c2e7a5f8..8d082ab60b 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -845,7 +845,12 @@ static void hbStopThread() { return; } - taosThreadJoin(clientHbMgr.thread, NULL); + // thread quit mode kill or inner exit from self-thread + if (clientHbMgr.quitByKill) { + taosThreadKill(clientHbMgr.thread, 0); + } else { + taosThreadJoin(clientHbMgr.thread, NULL); + } tscDebug("hb thread stopped"); } @@ -1037,3 +1042,8 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } + +// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner +void taos_set_hb_quit(int8_t quitByKill) { + clientHbMgr.quitByKill = quitByKill; +} diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index bc5809ffe8..795621dfdd 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -83,6 +83,9 @@ int main(int argc, char *argv[]) { #endif taos_init(); + // kill heart-beat thread when quit + taos_set_hb_quit(1); + if (shell.args.is_dump_config) { shellDumpConfig(); taos_cleanup(); From 298781dc55c6f20b2ae858c54bafcd2b4357c9c1 Mon Sep 17 00:00:00 2001 From: huolibo Date: Tue, 18 Apr 2023 17:45:08 +0800 Subject: [PATCH 16/18] fix: illegal accesses (#20960) (#20962) --- source/client/src/clientJniConnector.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientJniConnector.c b/source/client/src/clientJniConnector.c index d2a9665eee..b613354751 100644 --- a/source/client/src/clientJniConnector.c +++ b/source/client/src/clientJniConnector.c @@ -1259,8 +1259,9 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInse int code = taos_errno(tres); if (code != TSDB_CODE_SUCCESS) { jniError("jobj:%p, conn:%p, code: 0x%x, msg:%s", jobj, taos, code, taos_errstr(tres)); + jobject jobject = createSchemalessResp(env, 0, code, taos_errstr(tres)); taos_free_result(tres); - return createSchemalessResp(env, 0, code, taos_errstr(tres)); + return jobject; } taos_free_result(tres); @@ -1286,8 +1287,9 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInse int code = taos_errno(tres); if (code != TSDB_CODE_SUCCESS) { jniError("jobj:%p, conn:%p, code: 0x%x, msg:%s", jobj, taos, code, taos_errstr(tres)); + jobject jobject = createSchemalessResp(env, 0, code, taos_errstr(tres)); taos_free_result(tres); - return createSchemalessResp(env, 0, code, taos_errstr(tres)); + return jobject; } taos_free_result(tres); @@ -1315,8 +1317,9 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInse int code = taos_errno(tres); if (code != TSDB_CODE_SUCCESS) { jniError("jobj:%p, conn:%p, code: 0x%x, msg:%s", jobj, taos, code, taos_errstr(tres)); + jobject jobject = createSchemalessResp(env, 0, code, taos_errstr(tres)); taos_free_result(tres); - return createSchemalessResp(env, 0, code, taos_errstr(tres)); + return jobject; } taos_free_result(tres); @@ -1343,8 +1346,9 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInse int code = taos_errno(tres); if (code != TSDB_CODE_SUCCESS) { jniError("jobj:%p, conn:%p, code: 0x%x, msg:%s", jobj, taos, code, taos_errstr(tres)); + jobject jobject = createSchemalessResp(env, 0, code, taos_errstr(tres)); taos_free_result(tres); - return createSchemalessResp(env, 0, code, taos_errstr(tres)); + return jobject; } taos_free_result(tres); From fb32ffd96029006653ce0e44f7c3e8ef5d91700d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Apr 2023 18:06:03 +0800 Subject: [PATCH 17/18] fix(stream): update the reference count value to be int32, insead of int8 --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tqRestore.c | 24 +++++++++++++++--------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index aade34e965..ba349e11f1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -345,7 +345,7 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; SRWLatch lock; - int8_t walScan; + int32_t walScan; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index cba51cdee4..3a4bb65c0a 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -27,31 +27,37 @@ int tqStreamTasksScanWal(STQ* pTq) { int64_t st = taosGetTimestampMs(); while (1) { - tqInfo("vgId:%d continue check if data in wal are available", vgId); + int32_t scan = pMeta->walScan; + tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan); // check all restore tasks - bool allFull = true; - streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull); + bool shouldIdle = true; + streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle); int32_t times = 0; - if (allFull) { + if (shouldIdle) { taosWLockLatch(&pMeta->lock); pMeta->walScan -= 1; times = pMeta->walScan; + ASSERT(pMeta->walScan >= 0); + if (pMeta->walScan <= 0) { taosWUnLockLatch(&pMeta->lock); break; } taosWUnLockLatch(&pMeta->lock); - tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times); + tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times); } } - double el = (taosGetTimestampMs() - st) / 1000.0; - tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el); + int64_t el = (taosGetTimestampMs() - st); + tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el); + + // restore wal scan flag +// atomic_store_8(&pTq->pStreamMeta->walScan, 0); return 0; } @@ -96,8 +102,8 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto continue; } - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { + if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || + pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); continue; From 4a9d62b467f7f246650c8a83cb7f04ed5ae3d988 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 19 Apr 2023 10:18:41 +0800 Subject: [PATCH 18/18] enh: ignore single row null data type validation --- source/common/src/tdataformat.c | 8 +++++--- source/libs/parser/src/parInsertStmt.c | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index d6ab974c6c..f379084cf5 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2503,9 +2503,11 @@ _exit: int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) { int32_t code = 0; - ASSERT(pColData->type == pBind->buffer_type); - - if (IS_VAR_DATA_TYPE(pBind->buffer_type)) { // var-length data type + if (!(pBind->num == 1 && pBind->is_null && *pBind->is_null)) { + ASSERT(pColData->type == pBind->buffer_type); + } + + if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type for (int32_t i = 0; i < pBind->num; ++i) { if (pBind->is_null && pBind->is_null[i]) { code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 01a635e4b2..922a0f45ff 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -251,7 +251,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in goto _return; } - if (bind[c].buffer_type != pColSchema->type) { + if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); goto _return; }