fix(stream): remove table meta in table-meta cache.

This commit is contained in:
Haojun Liao 2024-05-30 18:41:17 +08:00
parent b4956392d9
commit e66283d99c
1 changed files with 85 additions and 68 deletions

View File

@ -39,6 +39,7 @@ static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STab
const char* dstTableName, int64_t* uid); const char* dstTableName, int64_t* uid);
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
const char* id); const char* id);
static int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
int32_t numOfTags); int32_t numOfTags);
@ -396,46 +397,6 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) {
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
taosArrayDestroy(deleteReq.deleteReqs);
return TSDB_CODE_SUCCESS;
}
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code != TSDB_CODE_SUCCESS) {
qError("s-task:%s failed to encode delete request", pTask->id.idStr);
return code;
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
taosArrayDestroy(deleteReq.deleteReqs);
((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
return TSDB_CODE_SUCCESS;
}
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) { bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
if (pReader->me.type != TSDB_CHILD_TABLE) { if (pReader->me.type != TSDB_CHILD_TABLE) {
tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type); tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
@ -484,23 +445,6 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in
return pCreateTbReq; return pCreateTbReq;
} }
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) {
taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it
return TSDB_CODE_FAILED;
}
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo);
} else {
tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
}
return code;
}
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) { int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
int32_t code = 0; int32_t code = 0;
void* pBuf = NULL; void* pBuf = NULL;
@ -826,17 +770,6 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_
return code; return code;
} }
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* p = taosArrayGet(pBlocks, i);
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
return false;
}
}
return true;
}
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
const SArray* pBlocks = (const SArray*)data; const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
@ -884,6 +817,11 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
if (tbData.pCreateTbReq != NULL) {
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, pDataBlock->info.id.groupId, id);
tbData.pCreateTbReq = NULL;
}
continue; continue;
} }
@ -934,6 +872,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
if (tbData.pCreateTbReq != NULL) { if (tbData.pCreateTbReq != NULL) {
tdDestroySVCreateTbReq(tbData.pCreateTbReq); tdDestroySVCreateTbReq(tbData.pCreateTbReq);
doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, groupId, id);
tbData.pCreateTbReq = NULL; tbData.pCreateTbReq = NULL;
} }
continue; continue;
@ -973,3 +912,81 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
} }
} }
} }
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* p = taosArrayGet(pBlocks, i);
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
return false;
}
}
return true;
}
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) {
taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it
return TSDB_CODE_FAILED;
}
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo);
} else {
tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
}
return code;
}
int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
if (tSimpleHashGetSize(pSinkTableMap) == 0) {
return TSDB_CODE_SUCCESS;
}
int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
tqDebug("s-task:%s remove cached table meta for groupId:%"PRId64, id, groupId);
}
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) {
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
taosArrayDestroy(deleteReq.deleteReqs);
return TSDB_CODE_SUCCESS;
}
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code != TSDB_CODE_SUCCESS) {
qError("s-task:%s failed to encode delete request", pTask->id.idStr);
return code;
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
taosArrayDestroy(deleteReq.deleteReqs);
((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
return TSDB_CODE_SUCCESS;
}