Merge pull request #12974 from taosdata/feature/stream
enh: asan option
This commit is contained in:
commit
a4e3a9532c
|
@ -71,8 +71,8 @@ ELSE ()
|
|||
ENDIF ()
|
||||
|
||||
IF (${SANITIZER} MATCHES "true")
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3")
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3")
|
||||
MESSAGE(STATUS "Will compile with Address Sanitizer!")
|
||||
ELSE ()
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
||||
|
|
|
@ -51,6 +51,47 @@ int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_
|
|||
return strcmp(pKey1, pKey2);
|
||||
}
|
||||
|
||||
int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) {
|
||||
int32_t code;
|
||||
int32_t vlen;
|
||||
tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
|
||||
ASSERT(code == 0);
|
||||
|
||||
void* buf = taosMemoryCalloc(1, vlen);
|
||||
if (buf == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, vlen);
|
||||
|
||||
if (tEncodeSTqExec(&encoder, pExec) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
TXN txn;
|
||||
|
||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(buf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
||||
if (pTq == NULL) {
|
||||
|
@ -96,8 +137,31 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
|||
int vLen;
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
SDecoder decoder;
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
// create, put into execsj
|
||||
STqExec exec;
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
tDecodeSTqExec(&decoder, &exec);
|
||||
exec.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||
if (exec.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
|
||||
SReadHandle handle = {
|
||||
.reader = exec.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
};
|
||||
exec.task[i] = qCreateStreamExecTaskInfo(exec.qmsg, &handle);
|
||||
ASSERT(exec.task[i]);
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
}
|
||||
exec.pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
}
|
||||
taosHashPut(pTq->execs, pKey, kLen, &exec, sizeof(STqExec));
|
||||
}
|
||||
|
||||
if (tdbTxnClose(&txn) < 0) {
|
||||
|
@ -604,7 +668,9 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn);
|
||||
if (tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
|
@ -659,60 +725,21 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
}
|
||||
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
||||
|
||||
int32_t code;
|
||||
int32_t vlen;
|
||||
tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
|
||||
ASSERT(code == 0);
|
||||
|
||||
void* buf = taosMemoryCalloc(1, vlen);
|
||||
if (buf == NULL) {
|
||||
ASSERT(0);
|
||||
if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, vlen);
|
||||
|
||||
if (tEncodeSTqExec(&encoder, pExec) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
TXN txn;
|
||||
|
||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbTbUpsert(pTq->pExecStore, req.subKey, (int)strlen(req.subKey), buf, vlen, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(buf);
|
||||
|
||||
return 0;
|
||||
} else {
|
||||
/*if (req.newConsumerId != -1) {*/
|
||||
/*taosWLockLatch(&pExec->lock);*/
|
||||
ASSERT(pExec->consumerId == req.oldConsumerId);
|
||||
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
||||
// TODO handle qmsg and exec modification
|
||||
atomic_store_32(&pExec->epoch, -1);
|
||||
atomic_store_64(&pExec->consumerId, req.newConsumerId);
|
||||
atomic_add_fetch_32(&pExec->epoch, 1);
|
||||
/*taosWUnLockLatch(&pExec->lock);*/
|
||||
|
||||
if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
|
||||
// TODO
|
||||
}
|
||||
return 0;
|
||||
/*} else {*/
|
||||
// TODO
|
||||
/*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/
|
||||
/*return 0;*/
|
||||
/*}*/
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -708,7 +708,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
|
|||
pNewNode->removed = 0;
|
||||
pNewNode->next = NULL;
|
||||
|
||||
memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize);
|
||||
if (pData) memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize);
|
||||
memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen);
|
||||
|
||||
return pNewNode;
|
||||
|
@ -774,7 +774,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
|
|||
ASSERT(prevNode->next != prevNode);
|
||||
} else {
|
||||
pe->next = pOld->next;
|
||||
SHashNode* x = pe->next;
|
||||
SHashNode *x = pe->next;
|
||||
if (x != NULL) {
|
||||
ASSERT(x->next != x);
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@
|
|||
# ---- stream
|
||||
./test.sh -f tsim/stream/basic0.sim
|
||||
./test.sh -f tsim/stream/basic1.sim
|
||||
./test.sh -f tsim/stream/basic2.sim
|
||||
./test.sh -f tsim/stream/session0.sim
|
||||
./test.sh -f tsim/stream/session1.sim
|
||||
|
||||
|
|
Loading…
Reference in New Issue