fix:[TS-4593] check info error

This commit is contained in:
wangmm0220 2024-07-16 18:56:20 +08:00
parent 1e1612a5fa
commit 0704dcf45d
5 changed files with 60 additions and 101 deletions

View File

@ -725,19 +725,19 @@ static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicO
continue;
}
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(pTopic->ntbUid));
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
if (buf == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
*(int64_t*)abuf = pTopic->ntbUid;
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + sizeof(pTopic->ntbUid);
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
code = mndTransAppendRedoAction(pTrans, &action);
if (code != 0) {

View File

@ -132,7 +132,6 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen);
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen);
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle);
void* tqMetaGetCheckInfo(STQ* pTq, int64_t tbUid);
void* tqMetaGetOffset(STQ* pTq, const char* subkey);
int32_t tqMetaTransform(STQ* pTq);
// tqSink

View File

@ -76,7 +76,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosInitRWLatch(&pTq->lock);
pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
@ -282,16 +282,25 @@ end:
}
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
STqCheckInfo* pCheck = tqMetaGetCheckInfo(pTq, tbUid);
if(pCheck == NULL) {
return 0;
}
void* pIter = NULL;
int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) {
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
if (forbidColId == colId) {
return -1;
while (1) {
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
if (pIter == NULL) {
break;
}
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
if (pCheck->ntbUid == tbUid) {
int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) {
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
if (forbidColId == colId) {
taosHashCancelIterate(pTq->pCheckInfo, pIter);
return -1;
}
}
}
}
@ -585,39 +594,18 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
return 0;
}
void mergeTwoArray(SArray* a, SArray* b){
size_t bLen = taosArrayGetSize(b);
for(int i = 0; i < bLen; i++){
void* dataB = taosArrayGet(b, i);
size_t aLen = taosArrayGetSize(a);
int j = 0;
for(; j < aLen; j++){
void* dataA = taosArrayGet(a, i);
if(*(int64_t*)dataA == *(int64_t*)dataB){
break;
}
}
if(j == aLen){
taosArrayPush(a, dataB);
}
}
}
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqCheckInfo info = {0};
if(tqMetaDecodeCheckInfo(&info, msg, msgLen) != 0){
return -1;
}
STqCheckInfo *old = tqMetaGetCheckInfo(pTq, info.ntbUid);
if (old != NULL){
mergeTwoArray(info.colIdList, old->colIdList);
}
if (taosHashPut(pTq->pCheckInfo, &info.ntbUid, sizeof(info.ntbUid), &info, sizeof(STqCheckInfo)) < 0) {
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDeleteSTqCheckInfo(&info);
return -1;
}
if (tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.ntbUid, sizeof(info.ntbUid), msg, msgLen) < 0) {
if (tqMetaSaveInfo(pTq, pTq->pCheckStore, info.topic, strlen(info.topic), msg, msgLen) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -629,7 +617,8 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, msgLen) < 0) {
if (tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, strlen(msg)) < 0) {
tqError("cannot process tq delete check info req %s, since no such check info", msg);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}

View File

@ -87,31 +87,6 @@ int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){
return code;
}
void* tqMetaGetCheckInfo(STQ* pTq, int64_t tbUid){
void* data = taosHashGet(pTq->pCheckInfo, &tbUid, sizeof(tbUid));
if (data == NULL) {
int vLen = 0;
if (tdbTbGet(pTq->pCheckStore, &tbUid, sizeof(tbUid), &data, &vLen) < 0) {
tdbFree(data);
return NULL;
}
STqCheckInfo info= {0};
if(tqMetaDecodeCheckInfo(&info, data, vLen) != 0) {
tdbFree(data);
return NULL;
}
tdbFree(data);
if(taosHashPut(pTq->pCheckInfo, &tbUid, sizeof(tbUid), &info, sizeof(STqCheckInfo)) != 0){
tDeleteSTqCheckInfo(&info);
return NULL;
}
return taosHashGet(pTq->pCheckInfo, &tbUid, sizeof(tbUid));
} else {
return data;
}
}
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) {
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;
@ -326,7 +301,7 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}
static int32_t tqMetaTransformStoreInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew){
TBC* pCur = NULL;
void* pKey = NULL;
int kLen = 0;
@ -336,12 +311,12 @@ static int32_t tqMetaTransformStoreInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* p
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tdbTbcOpen(pExecStoreOld, &pCur, NULL));
TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
TQ_ERR_GO_TO_END (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn));
TQ_ERR_GO_TO_END (tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
}
TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn));
@ -354,37 +329,6 @@ END:
return code;
}
static int32_t tqMetaTransformCheckInfo(TDB* pMetaDB, TTB* pCheckOld, TTB* pCheckNew){
TBC* pCur = NULL;
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
TXN* txn = NULL;
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tdbTbcOpen(pCheckOld, &pCur, NULL));
TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqCheckInfo info= {0};
TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen));
int64_t uid = info.ntbUid;
tDeleteSTqCheckInfo(&info);
TQ_ERR_GO_TO_END (tdbTbUpsert(pCheckNew, &uid, sizeof(uid), pVal, vLen, txn));
}
TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn));
TQ_ERR_GO_TO_END (tdbPostCommit(pMetaDB, txn));
END:
tdbFree(pKey);
tdbFree(pVal);
(void)tdbTbcClose(pCur);
return code;
}
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
void* data = taosHashGet(pTq->pHandle, key, strlen(key));
if(data == NULL){
@ -434,6 +378,32 @@ static int32_t replaceTqPath(char** path){
return TDB_CODE_SUCCESS;
}
static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
TBC* pCur = NULL;
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
int32_t code = 0;
STqCheckInfo info = {0};
TQ_ERR_GO_TO_END(tdbTbcOpen(pTq->pCheckStore, &pCur, NULL));
TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen));
TQ_ERR_GO_TO_END(taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)));
}
info.colIdList = NULL;
END:
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
tDeleteSTqCheckInfo(&info);
return code;
}
int32_t tqMetaOpen(STQ* pTq) {
char* maindb = NULL;
int32_t code = TDB_CODE_SUCCESS;
@ -444,6 +414,7 @@ int32_t tqMetaOpen(STQ* pTq) {
}else{
TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
}
TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
END:
taosMemoryFree(maindb);
@ -466,8 +437,8 @@ int32_t tqMetaTransform(STQ* pTq) {
TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
TQ_ERR_GO_TO_END(tqMetaTransformStoreInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
TQ_ERR_GO_TO_END(tqMetaTransformCheckInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){

View File

@ -211,7 +211,7 @@ int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nD
goto _err;
}
code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.ntbUid, sizeof(info.ntbUid), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code) goto _err;
return code;