fix(tmq): save ntb check to another tb
This commit is contained in:
parent
778aa44fc2
commit
3e0e7a87be
|
@ -25,17 +25,17 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
||||||
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
||||||
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
|
int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
|
||||||
if (tEncodeI32(pEncoder, size) < 0) return -1;
|
if (tEncodeI32(pEncoder, size) < 0) return -1;
|
||||||
void *pIter = NULL;
|
void* pIter = NULL;
|
||||||
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
|
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
|
||||||
while(pIter){
|
while (pIter) {
|
||||||
int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL);
|
int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
|
||||||
if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
|
if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
|
||||||
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
|
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
|
||||||
}
|
}
|
||||||
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
|
@ -52,17 +52,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
||||||
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
||||||
}else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
||||||
for(int32_t i = 0; i < size; i++){
|
for (int32_t i = 0; i < size; i++) {
|
||||||
int64_t tbUid = 0;
|
int64_t tbUid = 0;
|
||||||
if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
|
||||||
taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
|
taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
|
||||||
}
|
}
|
||||||
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
|
@ -117,7 +117,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) {
|
if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, &txn) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,7 +284,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
};
|
};
|
||||||
|
|
||||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
|
||||||
handle.execHandle.task = qCreateQueueExecTaskInfo(
|
handle.execHandle.task = qCreateQueueExecTaskInfo(
|
||||||
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
|
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
|
||||||
ASSERT(handle.execHandle.task);
|
ASSERT(handle.execHandle.task);
|
||||||
|
@ -297,9 +296,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
|
|
||||||
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
||||||
handle.execHandle.task =
|
(SSnapContext**)(&reader.sContext));
|
||||||
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
|
||||||
|
@ -314,9 +313,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
|
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
||||||
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
||||||
handle.execHandle.task =
|
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||||
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||||
}
|
}
|
||||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
||||||
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
||||||
|
|
Loading…
Reference in New Issue