Merge pull request #28403 from taosdata/fix/syntax
refactor: do some internal refactor.
This commit is contained in:
commit
d715e50d68
|
@ -236,7 +236,7 @@ typedef struct {
|
||||||
void* vnode; // not available to encoder and decoder
|
void* vnode; // not available to encoder and decoder
|
||||||
FTbSink* tbSinkFunc;
|
FTbSink* tbSinkFunc;
|
||||||
STSchema* pTSchema;
|
STSchema* pTSchema;
|
||||||
SSHashObj* pTblInfo;
|
SSHashObj* pTbInfo;
|
||||||
} STaskSinkTb;
|
} STaskSinkTb;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -746,13 +746,13 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
pOutputInfo->tbSink.pTbInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||||
if (pOutputInfo->tbSink.pTblInfo == NULL) {
|
if (pOutputInfo->tbSink.pTbInfo == NULL) {
|
||||||
tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
|
tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr);
|
tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTbInfo, freePtr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
|
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
|
||||||
|
|
||||||
typedef struct STableSinkInfo {
|
typedef struct STableSinkInfo {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
tstr name;
|
tstr name;
|
||||||
|
@ -35,16 +37,22 @@ static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema
|
||||||
int64_t earlyTs, const char* id);
|
int64_t earlyTs, const char* id);
|
||||||
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
|
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
|
||||||
const char* dstTableName, int64_t* uid);
|
const char* dstTableName, int64_t* uid);
|
||||||
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
|
|
||||||
const char* id);
|
|
||||||
static int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
|
|
||||||
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
|
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
|
||||||
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
|
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
|
||||||
int32_t numOfTags);
|
int32_t numOfTags);
|
||||||
static int32_t createDefaultTagColName(SArray** pColNameList);
|
static int32_t createDefaultTagColName(SArray** pColNameList);
|
||||||
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
|
||||||
int64_t gid, bool newSubTableRule);
|
const char* stbFullName, int64_t gid, bool newSubTableRule);
|
||||||
static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo);
|
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
|
||||||
|
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
|
||||||
|
const char* id);
|
||||||
|
static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
|
||||||
|
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
|
||||||
|
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
|
||||||
|
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
|
||||||
|
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
|
||||||
|
int64_t earlyTs);
|
||||||
|
|
||||||
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
const char* pIdStr, bool newSubTableRule) {
|
const char* pIdStr, bool newSubTableRule) {
|
||||||
|
@ -81,7 +89,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
|
||||||
|
|
||||||
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
||||||
name[varDataLen(varTbName)] = '\0';
|
name[varDataLen(varTbName)] = '\0';
|
||||||
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 && stbFullName) {
|
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
|
||||||
|
stbFullName) {
|
||||||
int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
|
int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -161,16 +170,6 @@ end:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tqGetTableInfo(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
|
|
||||||
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
|
|
||||||
if (pVal) {
|
|
||||||
*pInfo = *(STableSinkInfo**)pVal;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
|
@ -245,18 +244,17 @@ int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock*
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
|
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
|
||||||
} else {
|
} else {
|
||||||
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
|
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
|
||||||
if (pCreateTableReq->name == NULL) {
|
if (pCreateTableReq->name == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
// tqDebug("copy name:%s", pDataBlock->info.parTbName);
|
// tqDebug("copy name:%s", pDataBlock->info.parTbName);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
|
int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
|
||||||
return code;
|
return code;
|
||||||
// tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -266,14 +264,18 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
||||||
SStreamTask* pTask, int64_t suid) {
|
SStreamTask* pTask, int64_t suid) {
|
||||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
|
SArray* tagArray = NULL;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
STableSinkInfo* pInfo = NULL;
|
||||||
|
SVCreateTbBatchReq reqs = {0};
|
||||||
|
SArray* crTblArray = NULL;
|
||||||
|
|
||||||
tqDebug("s-task:%s build create %d table(s) msg", id, rows);
|
tqDebug("s-task:%s build create %d table(s) msg", id, rows);
|
||||||
SVCreateTbBatchReq reqs = {0};
|
|
||||||
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
|
tagArray = taosArrayInit(4, sizeof(STagVal));
|
||||||
|
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
|
||||||
if ((NULL == reqs.pArray) || (tagArray == NULL)) {
|
if ((NULL == reqs.pArray) || (tagArray == NULL)) {
|
||||||
tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
|
tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
|
||||||
code = terrno;
|
code = terrno;
|
||||||
|
@ -291,6 +293,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
||||||
tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
|
tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClear(tagArray);
|
taosArrayClear(tagArray);
|
||||||
|
|
||||||
if (size == 2) {
|
if (size == 2) {
|
||||||
|
@ -356,8 +359,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid,
|
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask));
|
||||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -368,16 +370,15 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableSinkInfo* pInfo = NULL;
|
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
|
||||||
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, gid, &pInfo);
|
|
||||||
if (!alreadyCached) {
|
if (!alreadyCached) {
|
||||||
code = doCreateSinkInfo(pCreateTbReq->name, &pInfo);
|
code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
|
tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, id);
|
code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
|
tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
|
||||||
}
|
}
|
||||||
|
@ -527,8 +528,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
||||||
taosArrayDestroy(pExisted->aRowP);
|
taosArrayDestroy(pExisted->aRowP);
|
||||||
pExisted->aRowP = pFinal;
|
pExisted->aRowP = pFinal;
|
||||||
|
|
||||||
tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d",
|
tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
|
||||||
id, (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
|
(int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
|
||||||
(pNew->pCreateTbReq != NULL));
|
(pNew->pCreateTbReq != NULL));
|
||||||
|
|
||||||
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
|
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
|
||||||
|
@ -806,7 +807,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
|
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
|
||||||
int32_t nameLen = strlen(pDstTableName);
|
int32_t nameLen = strlen(pDstTableName);
|
||||||
(*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
|
(*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
|
||||||
if (*pInfo == NULL) {
|
if (*pInfo == NULL) {
|
||||||
|
@ -830,7 +831,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
STableSinkInfo* pTableSinkInfo = NULL;
|
STableSinkInfo* pTableSinkInfo = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo);
|
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
|
||||||
|
|
||||||
if (alreadyCached) {
|
if (alreadyCached) {
|
||||||
if (dstTableName[0] == 0) { // data block does not set the destination table name
|
if (dstTableName[0] == 0) { // data block does not set the destination table name
|
||||||
|
@ -870,7 +871,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doCreateSinkInfo(dstTableName, &pTableSinkInfo);
|
code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
|
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
|
||||||
} else {
|
} else {
|
||||||
|
@ -906,14 +907,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
|
|
||||||
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
||||||
if (pTagArray == NULL) {
|
if (pTagArray == NULL) {
|
||||||
|
tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
|
||||||
|
tstrerror(terrno));
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||||
code =
|
code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
||||||
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq);
|
||||||
(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1),
|
|
||||||
&pTableData->pCreateTbReq);
|
|
||||||
taosArrayDestroy(pTagArray);
|
taosArrayDestroy(pTagArray);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -923,12 +924,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableSinkInfo->uid = 0;
|
pTableSinkInfo->uid = 0;
|
||||||
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
|
||||||
} else {
|
} else {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id,
|
tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
|
||||||
vgId, dstTableName);
|
dstTableName);
|
||||||
return TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
return TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -944,7 +945,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
pTableSinkInfo->uid = mr.me.uid;
|
pTableSinkInfo->uid = mr.me.uid;
|
||||||
|
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -952,7 +953,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||||
SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
|
SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
|
||||||
int32_t numOfRows = pDataBlock->info.rows;
|
int32_t numOfRows = pDataBlock->info.rows;
|
||||||
char* dstTableName = pDataBlock->info.parTbName;
|
char* dstTableName = pDataBlock->info.parTbName;
|
||||||
|
@ -975,6 +976,43 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
|
||||||
|
if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
|
||||||
|
SMetaReader mer1 = {0};
|
||||||
|
metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
|
||||||
|
|
||||||
|
code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
|
||||||
|
metaReaderClear(&mer1);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
|
||||||
|
metaReaderClear(&mer1);
|
||||||
|
|
||||||
|
if (pOutputInfo->tbSink.pTagSchema == NULL) {
|
||||||
|
tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
|
||||||
|
SSchema* pCol1 = &pTagSchema->pSchema[0];
|
||||||
|
if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
|
||||||
|
pOutputInfo->tbSink.autoCreateCtb = true;
|
||||||
|
} else {
|
||||||
|
pOutputInfo->tbSink.autoCreateCtb = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
const SArray* pBlocks = (const SArray*)data;
|
const SArray* pBlocks = (const SArray*)data;
|
||||||
SVnode* pVnode = (SVnode*)vnode;
|
SVnode* pVnode = (SVnode*)vnode;
|
||||||
|
@ -988,29 +1026,11 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
|
int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
|
||||||
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||||
|
|
||||||
if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
|
code = checkTagSchema(pTask, pVnode);
|
||||||
SMetaReader mer1 = {0};
|
|
||||||
metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
|
|
||||||
|
|
||||||
code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
|
|
||||||
metaReaderClear(&mer1);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
|
|
||||||
metaReaderClear(&mer1);
|
|
||||||
|
|
||||||
SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
|
|
||||||
SSchema* pCol1 = &pTagSchema->pSchema[0];
|
|
||||||
if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
|
|
||||||
pOutputInfo->tbSink.autoCreateCtb = true;
|
|
||||||
} else {
|
|
||||||
pOutputInfo->tbSink.autoCreateCtb = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
|
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
|
||||||
if (!onlySubmitData) {
|
if (!onlySubmitData) {
|
||||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
|
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
|
||||||
|
@ -1033,45 +1053,127 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
pTask->execInfo.sink.numOfBlocks += 1;
|
code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
|
||||||
|
|
||||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
|
||||||
if (submitReq.aSubmitTbData == NULL) {
|
|
||||||
code = terrno;
|
|
||||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
|
||||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
|
|
||||||
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
|
||||||
if (tbData.pCreateTbReq != NULL) {
|
|
||||||
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
|
||||||
(void) doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, pDataBlock->info.id.groupId, id);
|
|
||||||
tbData.pCreateTbReq = NULL;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
|
||||||
if (p == NULL) {
|
|
||||||
tqDebug("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
|
|
||||||
if (code) { // failed and continue
|
|
||||||
tqDebug("vgId:%d, s-task:%s submit msg failed, data lost", vgId, id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
|
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
|
||||||
|
if (streamTaskShouldStop(pTask)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
|
||||||
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
|
SSDataBlock* p = taosArrayGet(pBlocks, i);
|
||||||
|
if (p == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
|
||||||
|
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFreeClear(pTableSinkInfo);
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
|
||||||
|
pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
|
||||||
|
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
|
||||||
|
if (pVal) {
|
||||||
|
*pInfo = *(STableSinkInfo**)pVal;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
|
||||||
|
if (tSimpleHashGetSize(pSinkTableMap) == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
|
||||||
|
if (code == 0) {
|
||||||
|
tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
|
||||||
|
} else {
|
||||||
|
tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||||
|
int64_t suid) {
|
||||||
|
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
|
||||||
|
if (deleteReq.deleteReqs == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code =
|
||||||
|
tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
|
||||||
|
taosArrayDestroy(deleteReq.deleteReqs);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len;
|
||||||
|
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("s-task:%s failed to encode delete request", pTask->id.idStr);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
||||||
|
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
||||||
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
taosArrayDestroy(deleteReq.deleteReqs);
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
|
SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
|
||||||
|
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||||
|
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
|
||||||
|
int32_t code = 0;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||||
|
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||||
|
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||||
|
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
||||||
|
|
||||||
SHashObj* pTableIndexMap =
|
SHashObj* pTableIndexMap =
|
||||||
taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
@ -1085,12 +1187,6 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
|
|
||||||
bool hasSubmit = false;
|
bool hasSubmit = false;
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
if (streamTaskShouldStop(pTask)) {
|
|
||||||
taosHashCleanup(pTableIndexMap);
|
|
||||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -1118,7 +1214,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
||||||
if (tbData.pCreateTbReq != NULL) {
|
if (tbData.pCreateTbReq != NULL) {
|
||||||
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||||
(void) doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, groupId, id);
|
(void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
|
||||||
tbData.pCreateTbReq = NULL;
|
tbData.pCreateTbReq = NULL;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -1171,94 +1267,52 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||||
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
|
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
|
||||||
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
|
int32_t code = 0;
|
||||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||||
SSDataBlock* p = taosArrayGet(pBlocks, i);
|
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||||
if (p == NULL) {
|
const char* id = pTask->id.idStr;
|
||||||
continue;
|
int32_t vgId = TD_VID(pVnode);
|
||||||
}
|
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
||||||
|
|
||||||
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
|
pTask->execInfo.sink.numOfBlocks += 1;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||||
}
|
if (submitReq.aSubmitTbData == NULL) {
|
||||||
|
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
|
||||||
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
|
|
||||||
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
taosMemoryFreeClear(pTableSinkInfo);
|
|
||||||
} else {
|
|
||||||
tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
|
|
||||||
pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
|
|
||||||
if (tSimpleHashGetSize(pSinkTableMap) == 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
|
|
||||||
if (code == 0) {
|
|
||||||
tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
|
|
||||||
} else {
|
|
||||||
tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
|
||||||
int64_t suid) {
|
|
||||||
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
|
|
||||||
if (deleteReq.deleteReqs == NULL) {
|
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
|
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
|
code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
|
||||||
taosArrayDestroy(deleteReq.deleteReqs);
|
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
||||||
return TSDB_CODE_SUCCESS;
|
if (tbData.pCreateTbReq != NULL) {
|
||||||
|
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||||
|
(void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
|
||||||
|
tbData.pCreateTbReq = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len;
|
|
||||||
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
qError("s-task:%s failed to encode delete request", pTask->id.idStr);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEncoder encoder = {0};
|
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||||
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
if (p == NULL) {
|
||||||
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
|
||||||
tEncoderInit(&encoder, abuf, len);
|
return terrno;
|
||||||
code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
}
|
||||||
tEncoderClear(&encoder);
|
|
||||||
taosArrayDestroy(deleteReq.deleteReqs);
|
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
|
||||||
|
if (code) { // failed and continue
|
||||||
|
tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
if (code) {
|
|
||||||
return code;
|
return code;
|
||||||
}
|
|
||||||
|
|
||||||
((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
|
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
|
|
||||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
|
||||||
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -289,7 +289,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||||
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
|
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
|
||||||
taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
|
taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
|
||||||
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
|
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
|
||||||
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
|
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
|
|
Loading…
Reference in New Issue