diff --git a/example/src/tmq.c b/example/src/tmq.c
index 976d658fa6..2ee91c254c 100644
--- a/example/src/tmq.c
+++ b/example/src/tmq.c
@@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) {
int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields);
printf("%s\n", buf);
+
+ const char* tbName = tmq_get_table_name(msg);
+ if (tbName) {
+ printf("from tb: %s\n", tbName);
+ }
}
}
@@ -101,8 +106,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
- pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
- /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
+ /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
+ pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
diff --git a/include/client/taos.h b/include/client/taos.h
index 26d4d18234..486d5f5fef 100644
--- a/include/client/taos.h
+++ b/include/client/taos.h
@@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
-// TODO
-#if 0
-DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
-#endif
+DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
#if 0
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 7f17c1673b..d655b82a08 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -2489,6 +2489,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
tlen += taosEncodeSSchemaWrapper(buf, pSW);
}
+ if (pRsp->withTbName) {
+ char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i);
+ tlen += taosEncodeString(buf, tbName);
+ }
}
}
return tlen;
@@ -2501,6 +2505,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
+ pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) {
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
@@ -2519,6 +2524,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW);
}
+ if (pRsp->withTbName) {
+ char* name = NULL;
+ buf = taosDecodeString(buf, &name);
+ taosArrayPush(pRsp->blockTbName, &name);
+ }
}
}
return (void*)buf;
diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c
index 0ce689f19c..b42f072e54 100644
--- a/source/client/src/tmq.c
+++ b/source/client/src/tmq.c
@@ -1346,3 +1346,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
return -1;
}
}
+
+const char* tmq_get_table_name(TAOS_RES* res) {
+ if (TD_RES_TMQ(res)) {
+ SMqRspObj* pRspObj = (SMqRspObj*)res;
+ if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
+ pRspObj->resIter >= pRspObj->rsp.blockNum) {
+ return NULL;
+ }
+ const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
+ return name;
+ }
+ return NULL;
+}
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index ca0ae111a3..12e89277f4 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -318,6 +318,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
pOld->nextColId = pNew->nextColId;
+ pOld->ttl = pNew->ttl;
pOld->numOfColumns = pNew->numOfColumns;
pOld->numOfTags = pNew->numOfTags;
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
@@ -832,7 +833,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) {
}
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
- if (pAlter->commentLen != 0) return 0;
+ if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0;
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
@@ -883,7 +884,8 @@ static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
return 0;
}
-static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen) {
+static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
+ int32_t ttl) {
if (commentLen > 0) {
pNew->commentLen = commentLen;
pNew->comment = taosMemoryCalloc(1, commentLen);
@@ -893,6 +895,9 @@ static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pCo
}
memcpy(pNew->comment, pComment, commentLen);
}
+ if (ttl >= 0) {
+ pNew->ttl = ttl;
+ }
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
@@ -1232,7 +1237,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq *
code = mndAlterStbColumnBytes(pOld, &stbObj, pField0);
break;
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
- code = mndUpdateStbComment(pOld, &stbObj, pAlter->comment, pAlter->commentLen);
+ code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
break;
default:
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
@@ -1723,7 +1728,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
- char *p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures
+ char *p = taosMemoryCalloc(1, pStb->commentLen + 1 + VARSTR_HEADER_SIZE); // check malloc failures
if (p != NULL) {
if (pStb->commentLen != 0) {
STR_TO_VARSTR(p, pStb->comment);
diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c
index 41d4d5f406..00379ecda1 100644
--- a/source/dnode/mnode/impl/src/mndTopic.c
+++ b/source/dnode/mnode/impl/src/mndTopic.c
@@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
- topicObj.withTbName = 0;
- topicObj.withSchema = 0;
+ topicObj.withTbName = pCreate->withTbName;
+ topicObj.withSchema = pCreate->withSchema;
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h
index 1e271bc8d8..f1917d5fea 100644
--- a/source/dnode/vnode/src/inc/meta.h
+++ b/source/dnode/vnode/src/inc/meta.h
@@ -116,10 +116,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
// SMetaDB
int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);
-int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
-int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
-int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
-int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
+// int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
+int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
+int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
+int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
#endif
#endif
@@ -128,4 +128,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
}
#endif
-#endif /*_TD_VNODE_META_H_*/
\ No newline at end of file
+#endif /*_TD_VNODE_META_H_*/
diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c
index 8d2a4ebcf3..7d0a87d79d 100644
--- a/source/dnode/vnode/src/meta/metaQuery.c
+++ b/source/dnode/vnode/src/meta/metaQuery.c
@@ -158,7 +158,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
skmDbKey.sver = sver;
pKey = &skmDbKey;
kLen = sizeof(skmDbKey);
+ metaRLock(pMeta);
ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
+ metaULock(pMeta);
if (ret < 0) {
return NULL;
}
@@ -181,6 +183,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
}
struct SMCtbCursor {
+ SMeta *pMeta;
TDBC *pCur;
tb_uid_t suid;
void *pKey;
@@ -200,9 +203,13 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
return NULL;
}
+ pCtbCur->pMeta = pMeta;
pCtbCur->suid = uid;
+ metaRLock(pMeta);
+
ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
if (ret < 0) {
+ metaULock(pMeta);
taosMemoryFree(pCtbCur);
return NULL;
}
@@ -220,6 +227,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
if (pCtbCur) {
+ if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) {
tdbDbcClose(pCtbCur->pCur);
@@ -269,7 +277,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
pSW = metaGetTableSchema(pMeta, quid, sver, 0);
if (!pSW) return NULL;
-
+
tdInitTSchemaBuilder(&sb, 0);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index b49c148ac5..dac1caff69 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -427,9 +427,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.withSchema = pExec->withSchema;
+ rsp.withTbName = pExec->withTbName;
+
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
+ rsp.blockTbName = taosArrayInit(0, sizeof(void*));
while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch);
@@ -535,6 +538,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush(rsp.blockSchema, &pSW);
}
+ if (pExec->withTbName) {
+ SMetaReader mr = {0};
+ metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
+ int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
+ if (metaGetTableEntryByUid(&mr, uid) < 0) {
+ ASSERT(0);
+ }
+ char* tbName = strdup(mr.me.name);
+ taosArrayPush(rsp.blockTbName, &tbName);
+ metaReaderClear(&mr);
+ }
+
rsp.blockNum++;
}
// db subscribe
@@ -563,6 +578,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
+ if (pExec->withTbName) {
+ SMetaReader mr = {0};
+ metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
+ if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
+ ASSERT(0);
+ }
+ char* tbName = strdup(mr.me.name);
+ taosArrayPush(rsp.blockTbName, &tbName);
+ metaReaderClear(&mr);
+ }
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(rsp.blockSchema, &pSW);
@@ -614,6 +639,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
+ taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
return 0;
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c
index 031d200a66..c1813c787a 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCommit.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c
@@ -70,6 +70,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
+static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
@@ -485,8 +486,10 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
for (int i = 1; i < pCommith->niters; i++) {
tSkipListDestroyIter(pCommith->iters[i].pIter);
- tdFreeSchema(pCommith->iters[i].pTable->pSchema);
- taosMemoryFree(pCommith->iters[i].pTable);
+ if (pCommith->iters[i].pTable) {
+ tdFreeSchema(pCommith->iters[i].pTable->pSchema);
+ taosMemoryFreeClear(pCommith->iters[i].pTable);
+ }
}
taosMemoryFree(pCommith->iters);
@@ -889,9 +892,11 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
}
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
- SReadH *pReadh = &pCommith->readh;
- int nBlocks = pIdx->numOfBlocks;
- int bidx = 0;
+ SReadH *pReadh = &pCommith->readh;
+ STsdb *pTsdb = TSDB_READ_REPO(pReadh);
+ STSchema *pTSchema = NULL;
+ int nBlocks = pIdx->numOfBlocks;
+ int bidx = 0;
tsdbResetCommitTable(pCommith);
@@ -901,30 +906,49 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
return -1;
}
+ STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
+ pCommith->pTable = &table;
+
while (bidx < nBlocks) {
+ if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
+ // Set commit table
+ pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 0); // TODO: schema version
+ if (!pTSchema) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
+ table.pSchema = pTSchema;
+ if (tsdbSetCommitTable(pCommith, &table) < 0) {
+ taosMemoryFreeClear(pTSchema);
+ return -1;
+ }
+ }
+
if (tsdbMoveBlock(pCommith, bidx) < 0) {
tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
+ taosMemoryFreeClear(pTSchema);
return -1;
}
+
++bidx;
}
- STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
- TSDB_COMMIT_TABLE(pCommith) = &table;
-
if (tsdbWriteBlockInfo(pCommith) < 0) {
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
+ taosMemoryFreeClear(pTSchema);
return -1;
}
+ taosMemoryFreeClear(pTSchema);
return 0;
}
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
+
pCommith->pTable = pTable;
if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {
@@ -1321,6 +1345,14 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
return 0;
}
+static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) {
+ SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
+ if (pBlock->last) {
+ return pCommith->isLFileSame;
+ }
+ return pCommith->isDFileSame;
+}
+
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
SDFile *pDFile;
diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c
index d40a73eb67..1e8fbb48c7 100644
--- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c
+++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c
@@ -62,6 +62,16 @@ int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
if (pMemTable) {
taosHashCleanup(pMemTable->pHashIdx);
+ SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx);
+ SSkipListNode *pNode = NULL;
+ STbData *pTbData = NULL;
+ for (;;) {
+ if (!tSkipListIterNext(pIter)) break;
+ pNode = tSkipListIterGet(pIter);
+ pTbData = (STbData *)pNode->pData;
+ tsdbFreeTbData(pTbData);
+ }
+ tSkipListDestroyIter(pIter);
tSkipListDestroy(pMemTable->pSlIdx);
taosMemoryFree(pMemTable);
}
diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c
index 1260f9a3e7..f0f5338c4d 100644
--- a/source/dnode/vnode/src/vnd/vnodeSync.c
+++ b/source/dnode/vnode/src/vnd/vnodeSync.c
@@ -72,6 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t ret = 0;
SMsgCb *pMsgCb = rpcHandle;
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
+ pMsg->noResp = 1;
tmsgSendReq(rpcHandle, pEpSet, pMsg);
} else {
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 93e81aa70e..4881f23134 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -616,10 +616,10 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
-void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
+void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
-void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
+void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
diff --git a/source/libs/executor/inc/indexoperator.h b/source/libs/executor/inc/indexoperator.h
index 9e67ac7f41..d033c63ef8 100644
--- a/source/libs/executor/inc/indexoperator.h
+++ b/source/libs/executor/inc/indexoperator.h
@@ -13,11 +13,23 @@
* along with this program. If not, see .
*/
-#include "filter.h"
+#ifndef _INDEX_OPERATOR_H
+#define _INDEX_OPERATOR_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+#include "nodes.h"
#include "tglobal.h"
typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus;
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode);
// construct tag filter operator later
-int32_t doFilterTag(const SNode *pFilterNode, SArray *resutl);
+int32_t doFilterTag(const SNode *pFilterNode, SArray *result);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*INDEX_OPERATOR_*/
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 33f0c440ec..a5bc1fdf58 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
-static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
+static int32_t doCopyToSDataBlock(SExecTaskInfo *taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset,
SqlFunctionCtx* pCtx);
@@ -579,7 +579,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}
-void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
+void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey;
@@ -618,9 +618,14 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
pEntryInfo->numOfRes = 1;
continue;
}
-
+ int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
- pCtx[k].fpSet.process(&pCtx[k]);
+ code = pCtx[k].fpSet.process(&pCtx[k]);
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
+ taskInfo->code = code;
+ longjmp(taskInfo->env, code);
+ }
}
// restore it
@@ -806,7 +811,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
// this can be set during create the struct
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) {
- pCtx[k].fpSet.process(&pCtx[k]);
+ int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s call aggregate function error happens, code : %s",
+ GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
+ pOperator->pTaskInfo->code = code;
+ longjmp(pOperator->pTaskInfo->env, code);
+ }
}
}
}
@@ -2176,7 +2187,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
* @param pQInfo
* @param result
*/
-int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
+int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
@@ -2215,8 +2226,14 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
- if (pCtx[j].fpSet.process) {
- pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
+ if (pCtx[j].fpSet.finalize) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
+ if (TAOS_FAILED(code)) {
+ qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code));
+ taskInfo->code = code;
+ longjmp(taskInfo->env, code);
+ }
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
@@ -2243,7 +2260,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
return 0;
}
-void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
+void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
@@ -2257,7 +2274,7 @@ void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo
}
int32_t orderType = TSDB_ORDER_ASC;
- doCopyToSDataBlock(pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx);
+ doCopyToSDataBlock(taskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx);
// add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow(pBlock);
@@ -3749,7 +3766,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
- doBuildResultDatablock(pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pTaskInfo, pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index e3a507bf7c..5d22c13ec6 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -234,7 +234,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t rowIndex = j - num;
- doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
+ doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
@@ -252,7 +252,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t rowIndex = pBlock->info.rows - num;
- doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
+ doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
}
}
@@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
- doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
@@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
while(1) {
- doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c
index fe30ebb2ea..c17fcacf1f 100644
--- a/source/libs/executor/src/indexoperator.c
+++ b/source/libs/executor/src/indexoperator.c
@@ -583,7 +583,7 @@ int32_t doFilterTag(const SNode *pFilterNode, SArray *result) {
SFilterInfo *filter = NULL;
// todo move to the initialization function
- SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
+ // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
SIFParam param = {0};
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m));
@@ -598,9 +598,9 @@ SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) {
if (pFilterNode == NULL) {
return SFLT_NOT_INDEX;
}
- SFilterInfo *filter = NULL;
+ // SFilterInfo *filter = NULL;
// todo move to the initialization function
- SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
+ // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
SIF_ERR_RET(sifGetFltHint((SNode *)pFilterNode, &st));
return st;
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 738f4821bd..0f3b1bda20 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -703,7 +703,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
pInfo->order, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
- doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
+ doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
STimeWindow nextWin = win;
@@ -740,7 +740,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
pInfo->order, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
- doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
+ doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
@@ -855,7 +855,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
- doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
+ doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window
@@ -874,7 +874,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
- doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
+ doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
@@ -888,7 +888,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
- doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
@@ -921,7 +921,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
- doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
@@ -948,7 +948,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
- doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
@@ -998,7 +998,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
if (pOperator->status == OP_RES_TO_RETURN) {
- doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
@@ -1035,7 +1035,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
- doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
// TODO: remove for stream
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
@@ -1233,7 +1233,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
- doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
+ doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window
@@ -1252,7 +1252,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
- doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
+ doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
@@ -1265,7 +1265,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
- doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
@@ -1298,7 +1298,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
- doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
+ doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp
index 5ed960af82..c7ec0eece2 100644
--- a/source/libs/executor/test/executorTests.cpp
+++ b/source/libs/executor/test/executorTests.cpp
@@ -948,10 +948,6 @@ TEST(testCase, build_executor_tree_Test) {
code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
ASSERT_EQ(code, 0);
}
-TEST(testCase, index_plan_test) {
- // add later
- EXPECT_EQ(0, 0);
-}
#if 0
TEST(testCase, inMem_sort_Test) {
diff --git a/source/libs/executor/test/indexexcutorTests.cpp b/source/libs/executor/test/indexexcutorTests.cpp
new file mode 100644
index 0000000000..5f1bff45a3
--- /dev/null
+++ b/source/libs/executor/test/indexexcutorTests.cpp
@@ -0,0 +1,253 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wwrite-strings"
+#pragma GCC diagnostic ignored "-Wunused-function"
+#pragma GCC diagnostic ignored "-Wunused-variable"
+#pragma GCC diagnostic ignored "-Wsign-compare"
+
+#include "executor.h"
+#include "executorimpl.h"
+#include "indexoperator.h"
+#include "os.h"
+
+#include "stub.h"
+#include "taos.h"
+#include "tcompare.h"
+#include "tdatablock.h"
+#include "tdef.h"
+#include "trpc.h"
+#include "tvariant.h"
+
+namespace {
+SColumnInfo createColumnInfo(int32_t colId, int32_t type, int32_t bytes) {
+ SColumnInfo info = {0};
+ info.colId = colId;
+ info.type = type;
+ info.bytes = bytes;
+ return info;
+}
+
+int64_t sifLeftV = 21, sifRightV = 10;
+double sifLeftVd = 21.0, sifRightVd = 10.0;
+
+void sifFreeDataBlock(void *block) { blockDataDestroy(*(SSDataBlock **)block); }
+
+void sifInitLogFile() {
+ const char * defaultLogFileNamePrefix = "taoslog";
+ const int32_t maxLogFileNum = 10;
+
+ tsAsyncLog = 0;
+ qDebugFlag = 159;
+ strcpy(tsLogDir, "/tmp/sif");
+ taosRemoveDir(tsLogDir);
+ taosMkDir(tsLogDir);
+
+ if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
+ printf("failed to open log file in directory:%s\n", tsLogDir);
+ }
+}
+
+void sifAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *slotId, bool newBlock, int32_t rows,
+ SColumnInfo *colInfo) {
+ if (newBlock) {
+ SSDataBlock *res = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
+ res->info.numOfCols = 1;
+ res->info.rows = rows;
+ res->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
+ SColumnInfoData idata = {0};
+ idata.info = *colInfo;
+
+ taosArrayPush(res->pDataBlock, &idata);
+ taosArrayPush(pBlockList, &res);
+
+ blockDataEnsureCapacity(res, rows);
+
+ *dataBlockId = taosArrayGetSize(pBlockList) - 1;
+ res->info.blockId = *dataBlockId;
+ *slotId = 0;
+ } else {
+ SSDataBlock *res = *(SSDataBlock **)taosArrayGetLast(pBlockList);
+ res->info.numOfCols++;
+ SColumnInfoData idata = {0};
+ idata.info = *colInfo;
+
+ colInfoDataEnsureCapacity(&idata, 0, rows);
+
+ taosArrayPush(res->pDataBlock, &idata);
+
+ *dataBlockId = taosArrayGetSize(pBlockList) - 1;
+ *slotId = taosArrayGetSize(res->pDataBlock) - 1;
+ }
+}
+
+void sifMakeValueNode(SNode **pNode, int32_t dataType, void *value) {
+ SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_VALUE);
+ SValueNode *vnode = (SValueNode *)node;
+ vnode->node.resType.type = dataType;
+
+ if (IS_VAR_DATA_TYPE(dataType)) {
+ vnode->datum.p = (char *)taosMemoryMalloc(varDataTLen(value));
+ varDataCopy(vnode->datum.p, value);
+ vnode->node.resType.bytes = varDataTLen(value);
+ } else {
+ vnode->node.resType.bytes = tDataTypes[dataType].bytes;
+ assignVal((char *)nodesGetValueFromNode(vnode), (const char *)value, 0, dataType);
+ }
+
+ *pNode = (SNode *)vnode;
+}
+
+void sifMakeColumnNode(SNode **pNode, const char *db, const char *colName, EColumnType colType, uint8_t colValType) {
+ SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_COLUMN);
+ SColumnNode *rnode = (SColumnNode *)node;
+ memcpy(rnode->dbName, db, strlen(db));
+ memcpy(rnode->colName, colName, strlen(colName));
+
+ rnode->colType = colType;
+ rnode->node.resType.type = colValType;
+
+ *pNode = (SNode *)rnode;
+}
+
+void sifMakeOpNode(SNode **pNode, EOperatorType opType, int32_t resType, SNode *pLeft, SNode *pRight) {
+ SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_OPERATOR);
+ SOperatorNode *onode = (SOperatorNode *)node;
+ onode->node.resType.type = resType;
+ onode->node.resType.bytes = tDataTypes[resType].bytes;
+
+ onode->opType = opType;
+ onode->pLeft = pLeft;
+ onode->pRight = pRight;
+
+ *pNode = (SNode *)onode;
+}
+
+void sifMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) {
+ SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
+ SNodeListNode *lnode = (SNodeListNode *)node;
+ lnode->dataType.type = resType;
+ lnode->pNodeList = list;
+
+ *pNode = (SNode *)lnode;
+}
+
+void sifMakeLogicNode(SNode **pNode, ELogicConditionType opType, SNode **nodeList, int32_t nodeNum) {
+ SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
+ SLogicConditionNode *onode = (SLogicConditionNode *)node;
+ onode->condType = opType;
+ onode->node.resType.type = TSDB_DATA_TYPE_BOOL;
+ onode->node.resType.bytes = sizeof(bool);
+
+ onode->pParameterList = nodesMakeList();
+ for (int32_t i = 0; i < nodeNum; ++i) {
+ nodesListAppend(onode->pParameterList, nodeList[i]);
+ }
+
+ *pNode = (SNode *)onode;
+}
+
+void sifMakeTargetNode(SNode **pNode, int16_t dataBlockId, int16_t slotId, SNode *snode) {
+ SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_TARGET);
+ STargetNode *onode = (STargetNode *)node;
+ onode->pExpr = snode;
+ onode->dataBlockId = dataBlockId;
+ onode->slotId = slotId;
+
+ *pNode = (SNode *)onode;
+}
+
+} // namespace
+
+#if 1
+TEST(testCase, index_filter) {
+ {
+ SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
+ sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
+ sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
+ sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight);
+ SArray *result = taosArrayInit(4, sizeof(uint64_t));
+ doFilterTag(opNode, result);
+ EXPECT_EQ(1, taosArrayGetSize(result));
+ taosArrayDestroy(result);
+ nodesDestroyNode(res);
+ }
+ {
+ SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
+ sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
+ sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
+ sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
+
+ SArray *result = taosArrayInit(4, sizeof(uint64_t));
+ doFilterTag(opNode, result);
+ EXPECT_EQ(1, taosArrayGetSize(result));
+
+ taosArrayDestroy(result);
+ nodesDestroyNode(res);
+ }
+}
+
+TEST(testCase, index_filter_varify) {
+ {
+ SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
+ sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
+ sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
+ sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight);
+ nodesDestroyNode(res);
+
+ SIdxFltStatus st = idxGetFltStatus(opNode);
+ EXPECT_EQ(st, SFLT_ACCURATE_INDEX);
+ }
+ {
+ SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
+ sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
+ sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
+ sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
+
+ SIdxFltStatus st = idxGetFltStatus(opNode);
+ EXPECT_EQ(st, SFLT_COARSE_INDEX);
+ nodesDestroyNode(res);
+ }
+ {
+ SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
+ sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
+ sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
+ sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight);
+ nodesDestroyNode(res);
+
+ SIdxFltStatus st = idxGetFltStatus(opNode);
+ EXPECT_EQ(st, SFLT_ACCURATE_INDEX);
+ }
+ {
+ SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
+ sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
+ sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
+ sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
+
+ SIdxFltStatus st = idxGetFltStatus(opNode);
+ EXPECT_EQ(st, SFLT_COARSE_INDEX);
+ nodesDestroyNode(res);
+ }
+}
+
+#endif
+
+#pragma GCC diagnostic pop
diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h
index f5ee29603e..087e243497 100644
--- a/source/libs/function/inc/builtinsimpl.h
+++ b/source/libs/function/inc/builtinsimpl.h
@@ -95,6 +95,9 @@ bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stateCountFunction(SqlFunctionCtx* pCtx);
int32_t stateDurationFunction(SqlFunctionCtx* pCtx);
+bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
+int32_t csumFunction(SqlFunctionCtx* pCtx);
+
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
#ifdef __cplusplus
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index 00a87ab236..e165810e30 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -308,6 +308,37 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
return TSDB_CODE_SUCCESS;
}
+static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
+ if (1 != LIST_LENGTH(pFunc->pParameterList)) {
+ return TSDB_CODE_SUCCESS;
+ }
+
+ SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
+ if (QUERY_NODE_COLUMN != nodeType(pPara)) {
+ return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
+ "The input parameter of CSUM function can only be column");
+ }
+
+ uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
+ uint8_t resType;
+ if (!IS_NUMERIC_TYPE(colType)) {
+ return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
+ } else {
+ if (IS_SIGNED_NUMERIC_TYPE(colType)) {
+ resType = TSDB_DATA_TYPE_BIGINT;
+ } else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) {
+ resType = TSDB_DATA_TYPE_UBIGINT;
+ } else if (IS_FLOAT_TYPE(colType)) {
+ resType = TSDB_DATA_TYPE_DOUBLE;
+ } else {
+ ASSERT(0);
+ }
+ }
+
+ pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType};
+ return TSDB_CODE_SUCCESS;
+}
+
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo
return TSDB_CODE_SUCCESS;
@@ -742,6 +773,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = stateDurationFunction,
.finalizeFunc = NULL
},
+ {
+ .name = "csum",
+ .type = FUNCTION_TYPE_CSUM,
+ .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
+ .translateFunc = translateCsum,
+ .getEnvFunc = getCsumFuncEnv,
+ .initFunc = functionSetup,
+ .processFunc = csumFunction,
+ .finalizeFunc = NULL
+ },
{
.name = "abs",
.type = FUNCTION_TYPE_ABS,
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index aed576a687..ca470a0521 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -2818,7 +2818,6 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
- SColumnInfoData* pTsOutput = pCtx->pTsOutput;
int32_t numOfElems = 0;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
@@ -2856,7 +2855,6 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
- SColumnInfoData* pTsOutput = pCtx->pTsOutput;
int32_t numOfElems = 0;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
@@ -2896,3 +2894,58 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
return numOfElems;
}
+
+bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
+ pEnv->calcMemSize = sizeof(SSumRes);
+ return true;
+}
+
+int32_t csumFunction(SqlFunctionCtx* pCtx) {
+ SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
+ SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo);
+
+ SInputColumnInfoData* pInput = &pCtx->input;
+ TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
+
+ SColumnInfoData* pInputCol = pInput->pData[0];
+ SColumnInfoData* pTsOutput = pCtx->pTsOutput;
+ SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
+
+ int32_t numOfElems = 0;
+ int32_t type = pInputCol->info.type;
+ int32_t startOffset = pCtx->offset;
+ for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
+ int32_t pos = startOffset + numOfElems;
+ if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
+ //colDataAppendNULL(pOutput, i);
+ continue;
+ }
+
+ char* data = colDataGetData(pInputCol, i);
+ if (IS_SIGNED_NUMERIC_TYPE(type)) {
+ int64_t v;
+ GET_TYPED_DATA(v, int64_t, type, data);
+ pSumRes->isum += v;
+ colDataAppend(pOutput, pos, (char *)&pSumRes->isum, false);
+ } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
+ uint64_t v;
+ GET_TYPED_DATA(v, uint64_t, type, data);
+ pSumRes->usum += v;
+ colDataAppend(pOutput, pos, (char *)&pSumRes->usum, false);
+ } else if (IS_FLOAT_TYPE(type)) {
+ double v;
+ GET_TYPED_DATA(v, double, type, data);
+ pSumRes->dsum += v;
+ colDataAppend(pOutput, pos, (char *)&pSumRes->dsum, false);
+ }
+
+ //TODO: remove this after pTsOutput is handled
+ if (pTsOutput != NULL) {
+ colDataAppendInt64(pTsOutput, pos, &tsList[i]);
+ }
+
+ numOfElems++;
+ }
+
+ return numOfElems;
+}
diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c
index cf7dd50103..fffda68731 100644
--- a/source/libs/tdb/src/db/tdbBtree.c
+++ b/source/libs/tdb/src/db/tdbBtree.c
@@ -114,6 +114,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB
int tdbBtreeClose(SBTree *pBt) {
if (pBt) {
+ tdbFree(pBt->pBuf);
tdbOsFree(pBt);
}
return 0;
diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c
index aa05687426..8574e071f2 100644
--- a/source/libs/tdb/src/db/tdbPCache.c
+++ b/source/libs/tdb/src/db/tdbPCache.c
@@ -242,7 +242,7 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
int h;
h = tdbPCachePageHash(&(pPage->pgid));
- for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
+ for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
;
ASSERT(*ppPage == pPage);
*ppPage = pPage->pHashNext;
diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c
index 0cfe75bf33..7dfe1b8989 100644
--- a/source/libs/wal/src/walRead.c
+++ b/source/libs/wal/src/walRead.c
@@ -130,6 +130,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
}
}
+ // code set inner
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
return -1;
}
@@ -249,16 +250,22 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
// TODO: check wal life
if (pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
+ terrno = TSDB_CODE_WAL_INVALID_VER;
+ wError("unexpected wal log version: % " PRId64 ", since seek error", ver);
return -1;
}
}
- if (!taosValidFile(pRead->pReadLogTFile)) {
- return -1;
- }
+ /*if (!taosValidFile(pRead->pReadLogTFile)) {*/
+ /*return -1;*/
+ /*}*/
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
+ if (code < 0)
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ else
+ terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index da1c36dcc4..2e43997584 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -225,6 +225,7 @@ int walRoll(SWal *pWal) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
+ // terrno set inner
code = walRollFileInfo(pWal);
if (code != 0) {
return -1;
diff --git a/tests/script/tsim/stable/alter1.sim b/tests/script/tsim/stable/alter1.sim
index 5cee10756c..1205f50f6e 100644
--- a/tests/script/tsim/stable/alter1.sim
+++ b/tests/script/tsim/stable/alter1.sim
@@ -159,6 +159,7 @@ sql alter table db.stb rename tag t1 tx
print ========== alter common
sql alter table db.stb comment 'abcde' ;
+sql alter table db.stb ttl 10 ;
sql show db.stables;
if $data[0][6] != abcde then