Merge pull request #12931 from taosdata/feature/tq
feat(tmq): use tdb as persistent storage
This commit is contained in:
commit
8f795a638a
|
@ -96,7 +96,8 @@ struct STQ {
|
|||
SHashObj* pStreamTasks;
|
||||
SVnode* pVnode;
|
||||
SWal* pWal;
|
||||
TDB* pTdb;
|
||||
TDB* pMetaStore;
|
||||
TTB* pExecStore;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "tq.h"
|
||||
#include "tdbInt.h"
|
||||
|
||||
int32_t tqInit() {
|
||||
int8_t old;
|
||||
|
@ -46,6 +47,10 @@ void tqCleanUp() {
|
|||
}
|
||||
}
|
||||
|
||||
int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) {
|
||||
return strcmp(pKey1, pKey2);
|
||||
}
|
||||
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
||||
if (pTq == NULL) {
|
||||
|
@ -55,9 +60,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
|||
pTq->path = strdup(path);
|
||||
pTq->pVnode = pVnode;
|
||||
pTq->pWal = pWal;
|
||||
if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
||||
|
@ -65,6 +67,43 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
|||
|
||||
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
|
||||
if (tdbOpen(path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbTbOpen("exec", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
TXN txn;
|
||||
|
||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
/*if (tdbBegin(pTq->pMetaStore, &txn) < 0) {*/
|
||||
/*ASSERT(0);*/
|
||||
/*}*/
|
||||
|
||||
TBC* pCur;
|
||||
if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
void* pKey;
|
||||
int kLen;
|
||||
void* pVal;
|
||||
int vLen;
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
// create, put into execsj
|
||||
}
|
||||
|
||||
if (tdbTxnClose(&txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return pTq;
|
||||
}
|
||||
|
||||
|
@ -74,7 +113,7 @@ void tqClose(STQ* pTq) {
|
|||
taosHashCleanup(pTq->execs);
|
||||
taosHashCleanup(pTq->pStreamTasks);
|
||||
taosHashCleanup(pTq->pushMgr);
|
||||
tdbClose(pTq->pTdb);
|
||||
tdbClose(pTq->pMetaStore);
|
||||
taosMemoryFree(pTq);
|
||||
}
|
||||
// TODO
|
||||
|
@ -91,7 +130,6 @@ int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
|
|||
if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
|
||||
// TODO encode modified exec
|
||||
}
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
|
@ -108,7 +146,6 @@ int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
|
|||
if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
|
||||
// TODO decode modified exec
|
||||
}
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
|
@ -556,6 +593,23 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
|
||||
int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
|
||||
ASSERT(code == 0);
|
||||
|
||||
TXN txn;
|
||||
|
||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn);
|
||||
|
||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -604,6 +658,45 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
}
|
||||
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
||||
|
||||
int32_t code;
|
||||
int32_t vlen;
|
||||
tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
|
||||
ASSERT(code == 0);
|
||||
|
||||
void* buf = taosMemoryCalloc(1, vlen);
|
||||
if (buf == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, vlen);
|
||||
|
||||
if (tEncodeSTqExec(&encoder, pExec) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
TXN txn;
|
||||
|
||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbTbUpsert(pTq->pExecStore, req.subKey, (int)strlen(req.subKey), buf, vlen, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(buf);
|
||||
|
||||
return 0;
|
||||
} else {
|
||||
/*if (req.newConsumerId != -1) {*/
|
||||
|
|
|
@ -83,11 +83,11 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
|
|||
|
||||
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid,
|
||||
int32_t* pNumOfRows, int16_t* pNumOfCols) {
|
||||
/*int32_t sversion = pHandle->pBlock->sversion;*/
|
||||
// TODO set to real sversion
|
||||
*pUid = 0;
|
||||
|
||||
int32_t sversion = 1;
|
||||
// TODO set to real sversion
|
||||
/*int32_t sversion = 1;*/
|
||||
int32_t sversion = htonl(pHandle->pBlock->sversion);
|
||||
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
|
||||
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
|
||||
if (pHandle->pSchema == NULL) {
|
||||
|
|
Loading…
Reference in New Issue