diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 16e1a1c395..65244ec11a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -64,7 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); * @param SReadHandle * @return */ -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper); /** * Set the input data block for the stream scan. diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 7927c1e008..2caf81317d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -826,7 +826,7 @@ TEST(testCase, update_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); + TAOS_RES* pRes = taos_query(pConn, "select cast(0 as timestamp)-1y"); if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { printf("failed to create database, code:%s", taos_errstr(pRes)); taos_free_result(pRes); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c674728fe6..1792a18c07 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1163,9 +1163,15 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; + if (pColumn->varmeta.offset > 0) { + memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows); + } } else { if (pColumn->nullbitmap != NULL) { memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); + if (pColumn->pData != NULL) { + memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows); + } } } } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index abac77dc01..c2006a2535 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,7 +88,8 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; - int32_t numOfCols; // number of out pout column, temporarily used + int32_t numOfCols; // number of out pout column, temporarily used + SSchemaWrapper *pSchemaWrapper; // columns that are involved in query } STqExecHandle; typedef struct { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 054a8b6edf..cecf899591 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -579,9 +579,11 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche while (1) { SSDataBlock *output = NULL; uint64_t ts; - if (qExecTask(pItem->taskInfo, &output, &ts) < 0) { + + int32_t code = qExecTask(pItem->taskInfo, &output, &ts); + if (code < 0) { smaError("vgId:%d, qExecTask for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma), suid, - pItem->level, terrstr()); + pItem->level, terrstr(code)); goto _err; } if (!output) { @@ -597,35 +599,36 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche } taosArrayPush(pResult, output); - } - if (taosArrayGetSize(pResult) > 0) { + if (taosArrayGetSize(pResult) > 0) { #if 1 - char flag[10] = {0}; - snprintf(flag, 10, "level %" PRIi8, pItem->level); - blockDebugShowDataBlocks(pResult, flag); + char flag[10] = {0}; + snprintf(flag, 10, "level %" PRIi8, pItem->level); + blockDebugShowDataBlocks(pResult, flag); #endif - STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq *pReq = NULL; - // TODO: the schema update should be handled - if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma), - suid, pItem->level, terrstr()); - goto _err; - } + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + SSubmitReq *pReq = NULL; + // TODO: the schema update should be handled + if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { + smaError("vgId:%d, build submit req for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma), + suid, pItem->level, terrstr()); + goto _err; + } + + if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pStat->submitVer, 1), pReq) < 0) { + taosMemoryFreeClear(pReq); + smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); + goto _err; + } - if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pStat->submitVer, 1), pReq) < 0) { taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), - suid, pItem->level, terrstr()); - goto _err; + taosArrayClear(pResult); + } else if (terrno == 0) { + smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); + } else { + smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno)); } - - taosMemoryFreeClear(pReq); - } else if (terrno == 0) { - smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); - } else { - smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno)); } tdDestroySDataBlockArray(pResult); @@ -1364,4 +1367,4 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { _end: tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5469960ce6..23514c8c76 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -526,8 +526,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .initTqReader = true, .version = ver, }; - pHandle->execHandle.execCol.task[i] = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); + pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, + &pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 468490350a..83e852c79e 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -93,7 +93,8 @@ int32_t tqMetaOpen(STQ* pTq) { .version = handle.snapshotVer, }; - handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols); + handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, + &handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5dc1915bff..bad1037123 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1943,17 +1943,20 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pDelFile) { SDelFReader* pDelFReader = NULL; code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); - if (code) { + if (code != TSDB_CODE_SUCCESS) { goto _err; } SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx)); if (aDelIdx == NULL) { + tsdbDelFReaderClose(&pDelFReader); goto _err; } code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL); - if (code) { + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(aDelIdx); + tsdbDelFReaderClose(&pDelFReader); goto _err; } @@ -1962,9 +1965,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pIdx != NULL) { code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + } + + taosArrayDestroy(aDelIdx); + tsdbDelFReaderClose(&pDelFReader); + + if (code != TSDB_CODE_SUCCESS) { + goto _err; } } @@ -2532,8 +2539,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); - - if (pDumpInfo->rowIndex >= pBlock->nRow) { + if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) { *state = CHECK_FILEBLOCK_CONT; } } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2cc4058b3b..b36a5ebdd1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -164,6 +164,7 @@ typedef struct { char* dbname; int32_t tversion; SSchemaWrapper* sw; + SSchemaWrapper* qsw; } SSchemaInfo; typedef struct SExecTaskInfo { @@ -868,7 +869,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); + SExecTaskInfo* pTaskInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ff7d934c4b..b00dc9dba5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -120,7 +120,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO return code; } -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols) { +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper) { if (msg == NULL) { // TODO create raw scan return NULL; @@ -154,6 +154,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n } } + *pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw); return pTaskInfo; } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index c3efbf9336..e0020a496e 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -199,6 +199,10 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { void qDestroyTask(qTaskInfo_t qTaskHandle) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle; + if (pTaskInfo == NULL) { + return; + } + qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); queryCostStatis(pTaskInfo); // print the query cost summary diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 292e435732..c42d477b33 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1647,11 +1647,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } -int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) { - int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows); - return pBlock->info.rows; -} - void queryCostStatis(SExecTaskInfo* pTaskInfo) { STaskCostInfo* pSummary = &pTaskInfo->cost; @@ -4147,35 +4142,62 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea static SArray* extractColumnInfo(SNodeList* pNodeList); -int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { +SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); + +int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); - int32_t code = metaGetTableEntryByUid(&mr, uid); + int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid); if (code != TSDB_CODE_SUCCESS) { + qError("failed to get the table meta, uid:0x%"PRIx64", suid:0x%"PRIx64 ", %s", pScanNode->uid, pScanNode->suid, + GET_TASKID(pTaskInfo)); + metaReaderClear(&mr); return terrno; } - pTaskInfo->schemaInfo.tablename = strdup(mr.me.name); + SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo; + pSchemaInfo->tablename = strdup(mr.me.name); if (mr.me.type == TSDB_SUPER_TABLE) { - pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; } else if (mr.me.type == TSDB_CHILD_TABLE) { tDecoderClear(&mr.coder); tb_uid_t suid = mr.me.ctbEntry.suid; metaGetTableEntryByUid(&mr, suid); - pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; } else { - pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } metaReaderClear(&mr); + + pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode); return TSDB_CODE_SUCCESS; } +SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { + int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols); + SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + pqSw->pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema)); + + for(int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); + SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + + SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++]; + pSchema->colId = pColNode->colId; + pSchema->type = pColNode->node.resType.type; + pSchema->type = pColNode->node.resType.bytes; + strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); + } + + return pqSw; +} + static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { taosMemoryFreeClear(pSchemaInfo->dbname); if (pSchemaInfo->sw == NULL) { @@ -4183,8 +4205,8 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { } taosMemoryFree(pSchemaInfo->tablename); - taosMemoryFree(pSchemaInfo->sw->pSchema); - taosMemoryFree(pSchemaInfo->sw); + tDeleteSSchemaWrapper(pSchemaInfo->sw); + tDeleteSSchemaWrapper(pSchemaInfo->qsw); } static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { @@ -4385,7 +4407,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo); + code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); if (code) { pTaskInfo->code = terrno; return NULL; @@ -4405,7 +4427,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo); + code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); if (code) { pTaskInfo->code = terrno; return NULL; @@ -4422,11 +4444,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - STimeWindowAggSupp twSup = { - .waterMark = pTableScanNode->watermark, - .calTrigger = pTableScanNode->triggerType, - .maxTs = INT64_MIN, - }; if (pHandle->vnode) { int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); @@ -4436,7 +4453,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo, &twSup); + pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { @@ -4487,7 +4505,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); + code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 11aac2114d..574aa648e5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1525,7 +1525,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) { + SExecTaskInfo* pTaskInfo) { SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -1539,6 +1539,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pTagCond = pTagCond; + pInfo->twAggSup = (STimeWindowAggSupp){ + .waterMark = pTableScanNode->watermark, + .calTrigger = pTableScanNode->triggerType, + .maxTs = INT64_MIN, + }; + int32_t numOfCols = 0; pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); @@ -1591,7 +1597,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } if (pTSInfo->interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pTwSup->waterMark); + pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pInfo->twAggSup.waterMark); } else { pInfo->pUpdateInfo = NULL; } @@ -1631,7 +1637,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->deleteDataIndex = 0; pInfo->pDeleteDataRes = createPullDataBlock(); pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; - pInfo->twAggSup = *pTwSup; pOperator->name = "StreamScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 31c079e55f..90ffff5faf 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -66,12 +66,32 @@ static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) { static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey); +static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) { + if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + colDataAppend(pDst, rowIndex, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + colDataAppend(pDst, rowIndex, (char*)&v, false); + } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + colDataAppend(pDst, rowIndex, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + colDataAppend(pDst, rowIndex, (const char*)¤tKey, false); + } else { // varchar/nchar data + colDataAppendNULL(pDst, rowIndex); + } +} + static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts, bool outOfBound) { SPoint point1, point2, point; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); -// set the primary timestamp column value + // set the primary timestamp column value int32_t index = pBlock->info.rows; // set the other values @@ -160,30 +180,13 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* } else { // fill with user specified value for each column for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag) /* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) { + if (TSDB_COL_IS_TAG(pCol->flag)) { continue; } SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; - SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); - if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - colDataAppend(pDst, index, (char*)&v, false); - } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { - double v = 0; - GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - colDataAppend(pDst, index, (char*)&v, false); - } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - colDataAppend(pDst, index, (char*)&v, false); - } else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false); - } else { // varchar/nchar data - colDataAppendNULL(pDst, index); - } + doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey); } } @@ -273,7 +276,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t return outputRows; } } else { - assert(pFillInfo->currentKey == ts); + ASSERT(pFillInfo->currentKey == ts); int32_t index = pBlock->info.rows; if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) { @@ -295,27 +298,32 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); char* src = colDataGetData(pSrc, pFillInfo->index); - if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*|| - (pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) { + if (/*i == 0 || (*/!colDataIsNull_s(pSrc, pFillInfo->index)) { bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); colDataAppend(pDst, index, src, isNull); saveColData(pFillInfo->prev, i, src, isNull); - } else { // i > 0 and data is null , do interpolation - if (pFillInfo->type == TSDB_FILL_PREV) { - SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i); - doSetVal(pDst, index, pKey); - } else if (pFillInfo->type == TSDB_FILL_LINEAR) { - bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); - colDataAppend(pDst, index, src, isNull); - saveColData(pFillInfo->prev, i, src, isNull); - } else if (pFillInfo->type == TSDB_FILL_NULL) { - colDataAppendNULL(pDst, index); - } else if (pFillInfo->type == TSDB_FILL_NEXT) { - SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i); - doSetVal(pDst, index, pKey); - } else { - SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; - colDataAppend(pDst, index, (char*)&pVar->i, false); + } else { + if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false); + } else { // i > 0 and data is null , do interpolation + if (pFillInfo->type == TSDB_FILL_PREV) { + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; + SGroupKeys* pKey = taosArrayGet(p, i); + doSetVal(pDst, index, pKey); + } else if (pFillInfo->type == TSDB_FILL_LINEAR) { + bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); + colDataAppend(pDst, index, src, isNull); + saveColData(pFillInfo->prev, i, src, isNull); // todo: + } else if (pFillInfo->type == TSDB_FILL_NULL) { + colDataAppendNULL(pDst, index); + } else if (pFillInfo->type == TSDB_FILL_NEXT) { + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev; + SGroupKeys* pKey = taosArrayGet(p, i); + doSetVal(pDst, index, pKey); + } else { + SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; + doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey); + } } } } diff --git a/tests/script/tsim/insert/dupinsert.sim b/tests/script/tsim/insert/dupinsert.sim new file mode 100644 index 0000000000..0526cc19e4 --- /dev/null +++ b/tests/script/tsim/insert/dupinsert.sim @@ -0,0 +1,176 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print =============== create database +sql drop database if exists d0 +sql create database d0 keep 365000d,365000d,365000d +sql use d0 + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int unsigned, c2 double, c3 binary(10), c4 nchar(10), c5 double) tags (city binary(20),district binary(20)); + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +sql create table ct1 using stb tags("BeiJing", "ChaoYang") +sql create table ct2 using stb tags("BeiJing", "HaiDian") +sql create table ct3 using stb tags("BeiJing", "PingGu") +sql create table ct4 using stb tags("BeiJing", "YanQing") + +sql show tables +if $rows != 4 then + print rows $rows != 4 + return -1 +endi + +print =============== step 1 insert records into ct1 - taosd merge +sql insert into ct1(ts,c1,c2) values('2022-05-03 16:59:00.010', 10, 20); +sql insert into ct1(ts,c1,c2,c3,c4) values('2022-05-03 16:59:00.011', 11, NULL, 'binary', 'nchar'); +sql insert into ct1 values('2022-05-03 16:59:00.016', 16, NULL, NULL, 'nchar', NULL); +sql insert into ct1 values('2022-05-03 16:59:00.016', 17, NULL, NULL, 'nchar', 170); +sql insert into ct1 values('2022-05-03 16:59:00.020', 20, NULL, NULL, 'nchar', 200); +sql insert into ct1 values('2022-05-03 16:59:00.016', 18, NULL, NULL, 'nchar', 180); +sql insert into ct1 values('2022-05-03 16:59:00.021', 21, NULL, NULL, 'nchar', 210); +sql insert into ct1 values('2022-05-03 16:59:00.022', 22, NULL, NULL, 'nchar', 220); + +print =============== step 2 insert records into ct1/ct2 - taosc merge for 2022-05-03 16:59:00.010 +sql insert into ct1(ts,c1,c2) values('2022-05-03 16:59:00.010', 10,10), ('2022-05-03 16:59:00.010',20,10.0), ('2022-05-03 16:59:00.010',30,NULL) ct2(ts,c1) values('2022-05-03 16:59:00.010',10), ('2022-05-03 16:59:00.010',20) ct1(ts,c2) values('2022-05-03 16:59:00.010',10), ('2022-05-03 16:59:00.010',100) ct1(ts,c3) values('2022-05-03 16:59:00.010','bin1'), ('2022-05-03 16:59:00.010','bin2') ct1(ts,c4,c5) values('2022-05-03 16:59:00.010',NULL,NULL), ('2022-05-03 16:59:00.010','nchar4',1000.01) ct2(ts,c2,c3,c4,c5) values('2022-05-03 16:59:00.010',20,'xkl','zxc',10); + +print =============== step 3 insert records into ct3 +sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.020', 10,10); +sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.021', 10,10), ('2022-05-03 16:59:00.021',20,20.0); +sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.022', 30,30), ('2022-05-03 16:59:00.022',40,40.0),('2022-05-03 16:59:00.022',50,50.0); +sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.023', 60,60), ('2022-05-03 16:59:00.023',70,70.0),('2022-05-03 16:59:00.023',80,80.0), ('2022-05-03 16:59:00.023',90,90.0); +sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.024', 100,100), ('2022-05-03 16:59:00.025',110,110.0),('2022-05-03 16:59:00.025',120,120.0), ('2022-05-03 16:59:00.025',130,130.0); +sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.030', 140,140), ('2022-05-03 16:59:00.030',150,150.0),('2022-05-03 16:59:00.031',160,160.0), ('2022-05-03 16:59:00.030',170,170.0), ('2022-05-03 16:59:00.031',180,180.0); +sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.042', 190,190), ('2022-05-03 16:59:00.041',200,200.0),('2022-05-03 16:59:00.040',210,210.0); +sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.050', 220,220), ('2022-05-03 16:59:00.051',230,230.0),('2022-05-03 16:59:00.052',240,240.0); + +print =============== step 4 insert records into ct4 +sql insert into ct4(ts,c1,c3,c4) values('2022-05-03 16:59:00.020', 10,'b0','n0'); +sql insert into ct4(ts,c1,c3,c4) values('2022-05-03 16:59:00.021', 20,'b1','n1'), ('2022-05-03 16:59:00.021',30,'b2','n2'); +sql insert into ct4(ts,c1,c3,c4) values('2022-05-03 16:59:00.022', 40,'b3','n3'), ('2022-05-03 16:59:00.022',40,'b4','n4'),('2022-05-03 16:59:00.022',50,'b5','n5'); +sql insert into ct4(ts,c1,c3,c4) values('2022-05-03 16:59:00.023', 60,'b6','n6'), ('2022-05-03 16:59:00.024',70,'b7','n7'),('2022-05-03 16:59:00.024',80,'b8','n8'), ('2022-05-03 16:59:00.023',90,'b9','n9'); + + + +print =============== step 5 query records of ct1 from memory(taosc and taosd merge) +sql select * from ct1; +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + + + +print =============== step 6 query records of ct2 from memory(taosc and taosd merge) +sql select * from ct2; +print $data00 $data01 $data02 $data03 $data04 $data05 + +if $rows != 1 then + print rows $rows != 1 + return -1 +endi + + + +print =============== step 7 query records of ct3 from memory +sql select * from ct3; +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 +print $data60 $data61 $data62 $data63 $data64 $data65 +print $data70 $data71 $data72 $data73 $data74 $data75 +print $data80 $data81 $data82 $data83 $data84 $data85 +print $data90 $data91 $data92 $data93 $data94 $data95 +print $data[10][0] $data[10][1] $data[10][2] $data[10][3] $data[10][4] $data[10][5] +print $data[11][0] $data[11][1] $data[11][2] $data[11][3] $data[11][4] $data[11][5] +print $data[12][0] $data[12][1] $data[12][2] $data[12][3] $data[12][4] $data[12][5] +print $data[13][0] $data[13][1] $data[13][2] $data[13][3] $data[13][4] $data[13][5] + +if $rows != 14 then + print rows $rows != 14 + return -1 +endi + + +print =============== step 8 query records of ct4 from memory +sql select * from ct4; +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 + + +if $rows != 5 then + print rows $rows != 5 + return -1 +endi + + +#==================== reboot to trigger commit data to file +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + + +print =============== step 9 query records of ct1 from file +sql select * from ct1; +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +if $rows != 6 then + print rows $rows != 6 + return -1 +endi + + +print =============== step 10 query records of ct2 from file +sql select * from ct2; +print $data00 $data01 $data02 $data03 $data04 $data05 + +if $rows != 1 then + print rows $rows != 1 + return -1 +endi + + +print =============== step 11 query records of ct3 from file +sql select * from ct3; +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 +print $data60 $data61 $data62 $data63 $data64 $data65 +print $data70 $data71 $data72 $data73 $data74 $data75 +print $data80 $data81 $data82 $data83 $data84 $data85 +print $data90 $data91 $data92 $data93 $data94 $data95 +print $data[10][0] $data[10][1] $data[10][2] $data[10][3] $data[10][4] $data[10][5] +print $data[11][0] $data[11][1] $data[11][2] $data[11][3] $data[11][4] $data[11][5] +print $data[12][0] $data[12][1] $data[12][2] $data[12][3] $data[12][4] $data[12][5] +print $data[13][0] $data[13][1] $data[13][2] $data[13][3] $data[13][4] $data[13][5] + + +print =============== step 12 query records of ct4 from file +sql select * from ct4; +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 \ No newline at end of file diff --git a/tools/taos-tools b/tools/taos-tools index 69b558ccbf..0b8a3373bb 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 69b558ccbfe54a4407fe23eeae2e67c540f59e55 +Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a diff --git a/tools/taosws-rs b/tools/taosws-rs index 267a96fb09..c5fded266d 160000 --- a/tools/taosws-rs +++ b/tools/taosws-rs @@ -1 +1 @@ -Subproject commit 267a96fb09fc2ba14acfa47f7d3678def64c29c5 +Subproject commit c5fded266d3b10508e38bf3285bb7ecf798bc343