Merge pull request #13415 from taosdata/feature/tq
feat(tmq): support check col and tag modifiable
This commit is contained in:
commit
b95ec7b7d2
|
@ -37,6 +37,8 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
|||
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
||||
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colIds);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -70,6 +70,56 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
|||
return strchr(topic, '.') + 1;
|
||||
}
|
||||
|
||||
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colAndTagIds) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
bool found = false;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||
ASSERT(0);
|
||||
return false;
|
||||
}
|
||||
|
||||
SHashObj *pColHash = NULL;
|
||||
SNodeList *pNodeList;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
if (pCol->tableId != suid) goto NEXT;
|
||||
if (pColHash == NULL) {
|
||||
pColHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||
}
|
||||
if (pCol->colId > 0) {
|
||||
taosHashPut(pColHash, &pCol->colId, sizeof(int16_t), NULL, 0);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(colAndTagIds); i++) {
|
||||
int16_t *pColId = taosArrayGet(colAndTagIds, i);
|
||||
if (taosHashGet(pColHash, pColId, sizeof(int16_t)) != NULL) {
|
||||
found = true;
|
||||
goto NEXT;
|
||||
}
|
||||
}
|
||||
|
||||
NEXT:
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
if (found) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
|
|
|
@ -20,6 +20,28 @@ void tqTmrRspFunc(void* param, void* tmrId) {
|
|||
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
|
||||
}
|
||||
|
||||
static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataBlkRsp* pRsp) {
|
||||
SStreamDataSubmit* pSubmit = *ppSubmit;
|
||||
while (pSubmit != NULL) {
|
||||
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
|
||||
if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
}
|
||||
// update processed
|
||||
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
|
||||
streamQSetSuccess(&pHandle->pushHandle.inputQ);
|
||||
streamDataSubmitRefDec(pSubmit);
|
||||
if (pRsp->blockNum > 0) {
|
||||
*ppSubmit = pSubmit;
|
||||
return 0;
|
||||
} else {
|
||||
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
|
||||
}
|
||||
}
|
||||
*ppSubmit = pSubmit;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
|
||||
SMqDataBlkRsp rsp = {0};
|
||||
// 1. guard and set status executing
|
||||
|
@ -42,38 +64,14 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
|
|||
}
|
||||
// 3. exec, after each success, update processed ver
|
||||
// first run
|
||||
while (pSubmit != NULL) {
|
||||
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
|
||||
if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, &rsp, 0) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
}
|
||||
// update processed
|
||||
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
|
||||
streamQSetSuccess(&pHandle->pushHandle.inputQ);
|
||||
streamDataSubmitRefDec(pSubmit);
|
||||
if (rsp.blockNum > 0) {
|
||||
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
|
||||
goto SEND_RSP;
|
||||
} else {
|
||||
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
|
||||
}
|
||||
}
|
||||
// set exec status closing
|
||||
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__CLOSING);
|
||||
// second run
|
||||
while (pSubmit != NULL) {
|
||||
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
|
||||
if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, &rsp, 0) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
}
|
||||
// update processed
|
||||
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
|
||||
streamQSetSuccess(&pHandle->pushHandle.inputQ);
|
||||
streamDataSubmitRefDec(pSubmit);
|
||||
if (rsp.blockNum > 0) {
|
||||
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
|
||||
goto SEND_RSP;
|
||||
} else {
|
||||
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
|
||||
}
|
||||
}
|
||||
// set exec status idle
|
||||
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "walInt.h"
|
||||
|
||||
void walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||
pWal->vers.firstVer = -1;
|
||||
/*pWal->vers.firstVer = -1;*/
|
||||
pWal->vers.lastVer = ver;
|
||||
pWal->vers.commitVer = ver - 1;
|
||||
pWal->vers.snapshotVer = ver - 1;
|
||||
|
|
Loading…
Reference in New Issue