fix(stream): fix error in snode, fix memory leaks, fix repeatly create bug in sink task.

This commit is contained in:
Haojun Liao 2023-08-29 17:34:31 +08:00
parent ff2e1fbd9a
commit 204fb97f68
6 changed files with 411 additions and 266 deletions

View File

@ -106,7 +106,7 @@ typedef struct {
int8_t type; int8_t type;
} SStreamQueueItem; } SStreamQueueItem;
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data);
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
typedef struct { typedef struct {

View File

@ -85,8 +85,6 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 65535
#define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE) #define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) #define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define NCHAR_WIDTH_TO_BYTES(n) ((n)*TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE)
typedef int32_t VarDataOffsetT; typedef int32_t VarDataOffsetT;
typedef struct tstr { typedef struct tstr {

View File

@ -155,7 +155,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink // tqSink
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr); const char* pIdStr);
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data); void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data);
// tqOffset // tqOffset
char* tqOffsetBuildFName(const char* path, int32_t fVer); char* tqOffsetBuildFName(const char* path, int32_t fVer);

View File

@ -21,9 +21,14 @@
typedef struct STableSinkInfo { typedef struct STableSinkInfo {
uint64_t uid; uint64_t uid;
char tbName[TSDB_TABLE_NAME_LEN]; tstr name;
} STableSinkInfo; } STableSinkInfo;
static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
SSDataBlock* pDataBlock, SStreamTask* pTask);
static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr) {
int32_t totalRows = pDataBlock->info.rows; int32_t totalRows = pDataBlock->info.rows;
@ -97,17 +102,17 @@ end:
return ret; return ret;
} }
static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) { static bool tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) {
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
if (pVal) { if (pVal) {
*pInfo = *(STableSinkInfo**)pVal; *pInfo = *(STableSinkInfo**)pVal;
return TSDB_CODE_SUCCESS; return true;
} }
return TSDB_CODE_FAILED; return false;
} }
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { static int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) {
if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) { if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -115,7 +120,7 @@ int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTb
return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES);
} }
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
void* buf = NULL; void* buf = NULL;
int32_t tlen = 0; int32_t tlen = 0;
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
@ -128,59 +133,32 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
const SArray* pBlocks = (const SArray*)data; const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
int64_t suid = pTask->tbSink.stbUid; int64_t suid = pTask->tbSink.stbUid;
char* stbFullName = pTask->tbSink.stbFullName; char* stbFullName = pTask->tbSink.stbFullName;
STSchema* pTSchema = pTask->tbSink.pTSchema; STSchema* pTSchema = pTask->tbSink.pTSchema;
int32_t vgId = TD_VID(pVnode);
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
int32_t code = TSDB_CODE_SUCCESS;
int32_t blockSz = taosArrayGetSize(pBlocks); tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks);
tqDebug("vgId:%d, s-task:%s write results %d blocks into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
void* pBuf = NULL;
SArray* tagArray = NULL; SArray* tagArray = NULL;
SArray* pVals = NULL; SArray* pVals = NULL;
SArray* crTblArray = NULL; SArray* crTblArray = NULL;
for (int32_t i = 0; i < blockSz; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid);
tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr);
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
taosArrayDestroy(deleteReq.deleteReqs);
continue;
}
int32_t len;
int32_t code;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code != TSDB_CODE_SUCCESS) {
qError("s-task:%s failed to encode delete request", pTask->id.idStr);
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
taosArrayDestroy(deleteReq.deleteReqs);
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead) };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
SVCreateTbBatchReq reqs = {0}; SVCreateTbBatchReq reqs = {0};
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) { if (NULL == reqs.pArray) {
goto _end; goto _end;
} }
@ -209,10 +187,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
} }
STagVal tagVal = { STagVal tagVal = {
.cid = pTSchema->numOfCols + 1, .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
@ -254,7 +229,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
goto _end; goto _end;
} }
pCreateTbReq->ctb.pTag = (uint8_t*)pTag; pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name // set table name
@ -271,228 +245,18 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) { if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
} }
tagArray = taosArrayDestroy(tagArray); tagArray = taosArrayDestroy(tagArray);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
crTblArray = NULL; crTblArray = NULL;
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) { } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue; continue;
} else { } else {
SSubmitTbData tbData = {0}; code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask);
tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
goto _end;
}
tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version;
STableSinkInfo* pTableSinkInfo = NULL;
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
if (res != TSDB_CODE_SUCCESS) {
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo));
}
char* ctbName = pDataBlock->info.parTbName;
if (!ctbName[0]) {
memset(ctbName, 0, TSDB_TABLE_NAME_LEN);
if (res == TSDB_CODE_SUCCESS) {
memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
} else {
buildCtbNameByGroupIdImpl(stbFullName, pDataBlock->info.id.groupId, ctbName);
memcpy(pTableSinkInfo->tbName, ctbName, strlen(ctbName));
tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
pDataBlock->info.id.groupId);
}
}
if (res == TSDB_CODE_SUCCESS) {
tbData.uid = pTableSinkInfo->uid;
} else {
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
SVCreateTbReq* pCreateTbReq = NULL;
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
goto _end;
};
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
// set tag content
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
.cid = pTSchema->numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
// set table name
pCreateTbReq->name = taosStrdup(ctbName);
tbData.pCreateTbReq = pCreateTbReq;
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
} else {
if (mr.me.type != TSDB_CHILD_TABLE) {
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
mr.me.type);
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
continue;
}
if (mr.me.ctbEntry.suid != suid) {
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
", actual suid %" PRId64 "",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
continue;
}
tbData.uid = mr.me.uid;
pTableSinkInfo->uid = mr.me.uid;
int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo);
}
metaReaderClear(&mr);
}
}
// rows
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(tbData.aRowP);
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
goto _end;
}
for (int32_t j = 0; j < rows; j++) {
taosArrayClear(pVals);
int32_t dataIndex = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k];
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
dataIndex++;
} else {
void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
// address copy, no value
SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)};
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
SValue sv;
memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
dataIndex++;
}
}
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
taosArrayPush(tbData.aRowP, &pRow);
}
SSubmitReq2 submitReq = {0};
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
taosArrayPush(submitReq.aSubmitTbData, &tbData);
// encode
int32_t len;
int32_t code;
tEncodeSize(tEncodeSubmitReq, &submitReq, len, code);
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
goto _end;
}
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSubmitReq(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
tEncoderClear(&encoder);
rpcFreeCont(pBuf);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
continue;
}
tEncoderClear(&encoder);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put into write-queue since %s", terrstr());
}
} }
} }
tqDebug("vgId:%d, s-task:%s write results completed", TD_VID(pVnode), pTask->id.idStr); tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr);
_end: _end:
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
@ -500,3 +264,377 @@ _end:
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
// TODO: change // TODO: change
} }
int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) {
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
int32_t code = tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
taosArrayDestroy(deleteReq.deleteReqs);
return TSDB_CODE_SUCCESS;
}
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code != TSDB_CODE_SUCCESS) {
qError("s-task:%s failed to encode delete request", pTask->id.idStr);
return code;
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
taosArrayDestroy(deleteReq.deleteReqs);
((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
return TSDB_CODE_SUCCESS;
}
static bool isValidDestChildTable(SMetaReader* pReader, int32_t vgId, char* ctbName, int64_t suid) {
if (pReader->me.type != TSDB_CHILD_TABLE) {
tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
return false;
}
if (pReader->me.ctbEntry.suid != suid) {
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64,
vgId, ctbName, suid, pReader->me.ctbEntry.suid);
return false;
}
return true;
}
static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) {
char* ctbName = pDataBlock->info.parTbName;
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
if (pCreateTbReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// set tag content
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if (tagArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
return NULL;
}
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
taosArrayDestroy(tagArray);
if (pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
taosArrayPush(tagName, "group_id");
pCreateTbReq->ctb.tagName = tagName;
// set table name
pCreateTbReq->name = taosStrdup(ctbName);
return pCreateTbReq;
}
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, uint64_t uid,
const char* id) {
pTableSinkInfo->uid = uid;
int32_t code = tqPutTableInfo(pSinkTableMap, groupId, pTableSinkInfo);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo);
tqError("s-task:%s failed to put tableSinkInfo in to cache, code:%s", id, tstrerror(code));
} else {
tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
}
return code;
}
int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock,
SStreamTask* pTask) {
int32_t numOfRows = pDataBlock->info.rows;
int32_t vgId = TD_VID(pVnode);
uint64_t groupId = pDataBlock->info.id.groupId;
STSchema* pTSchema = pTask->tbSink.pTSchema;
int32_t code = TSDB_CODE_SUCCESS;
void* pBuf = NULL;
SArray* pVals = NULL;
const char* id = pTask->id.idStr;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
tqDebug("s-task:%s sink data pipeline, build submit msg from %d-th resBlock, including %d rows, dst suid:%" PRId64,
id, blockIndex + 1, numOfRows, suid);
tbData.aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
if (tbData.aRowP == NULL || pVals == NULL) {
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
code = TSDB_CODE_OUT_OF_MEMORY;
tqError("s-task:%s vgId:%d failed to prepare write stream res blocks, code:%s", id, vgId, tstrerror(code));
return code;
}
STableSinkInfo* pTableSinkInfo = NULL;
bool exist = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo);
char* dstTableName = pDataBlock->info.parTbName;
if (exist) {
if (dstTableName[0] == 0) {
tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
dstTableName);
} else {
if (pTableSinkInfo->uid != 0) {
tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
dstTableName, pTableSinkInfo->uid);
} else {
tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
id, numOfRows, groupId, dstTableName);
}
}
} else { // not exist
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
int32_t nameLen = strlen(dstTableName);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);
pTableSinkInfo->name.len = nameLen;
memcpy(pTableSinkInfo->name.data, dstTableName, nameLen);
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
}
if (exist) {
tbData.uid = pTableSinkInfo->uid;
if (tbData.uid == 0) {
tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
}
while (pTableSinkInfo->uid == 0) {
// wait for the table to be created
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
code = metaGetTableEntryByName(&mr, dstTableName);
if (code == 0) { // table alreay exists, check its type and uid
bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid);
if (!isValid) { // not valid table, ignore it
metaReaderClear(&mr);
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
} else {
tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
tbData.uid = mr.me.uid;
pTableSinkInfo->uid = mr.me.uid;
metaReaderClear(&mr);
}
} else { // not exist, wait and retry
metaReaderClear(&mr);
taosMsleep(100);
tqDebug("s-task:%s wait for the table:%s ready before insert data", id, dstTableName);
}
}
} else {
// todo: this check is not safe, and results in losing of submit message from WAL.
// The auto-create option will always set to be open for those submit messages, which arrive during the period
// the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
// data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
// those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
// randomly generated false table uid in the WAL.
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
// table not in cache, let's try the extract it from tsdb meta
if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
metaReaderClear(&mr);
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock);
if (tbData.pCreateTbReq == NULL) {
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
return terrno;
}
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, 0, id);
} else {
bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid);
if (!isValid) {
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
} else {
tbData.uid = mr.me.uid;
metaReaderClear(&mr);
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, tbData.uid, id);
}
}
}
// rows
for (int32_t j = 0; j < numOfRows; j++) {
taosArrayClear(pVals);
int32_t dataIndex = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k];
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("s-task:%s tq sink pipe2, row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
dataIndex++;
} else {
void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
// address copy, no value
SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)};
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
SValue sv;
memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
dataIndex++;
}
}
}
SRow* pRow = NULL;
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
if (code != TSDB_CODE_SUCCESS) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
return code;
}
ASSERT(pRow);
taosArrayPush(tbData.aRowP, &pRow);
}
SSubmitReq2 submitReq = {0};
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(submitReq.aSubmitTbData, &tbData);
// encode
int32_t len = 0;
tEncodeSize(tEncodeSubmitReq, &submitReq, len, code);
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
}
((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSubmitReq(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
tEncoderClear(&encoder);
rpcFreeCont(pBuf);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return code;
}
tEncoderClear(&encoder);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len };
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if(code == TSDB_CODE_SUCCESS) {
tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, pTableSinkInfo->name.data, numOfRows);
} else {
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
}
taosArrayDestroy(pVals);
return code;
}

View File

@ -221,7 +221,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
int32_t code = 0; int32_t code = 0;
int32_t type = pTask->outputInfo.type; int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__TABLE) { if (type == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
destroyStreamDataBlock(pBlock); destroyStreamDataBlock(pBlock);
} else if (type == TASK_OUTPUT__SMA) { } else if (type == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);

View File

@ -254,7 +254,16 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
void streamMetaClear(SStreamMeta* pMeta) { void streamMetaClear(SStreamMeta* pMeta) {
void* pIter = NULL; void* pIter = NULL;
while ((pIter = taosHashIterate(pMeta->pTasks, pIter)) != NULL) { while ((pIter = taosHashIterate(pMeta->pTasks, pIter)) != NULL) {
streamMetaReleaseTask(pMeta, *(SStreamTask**)pIter); SStreamTask* p = *(SStreamTask**)pIter;
// release the ref by timer
if (p->triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
taosTmrStop(p->schedTimer);
p->triggerParam = 0;
streamMetaReleaseTask(pMeta, p);
}
streamMetaReleaseTask(pMeta, p);
} }
taosRemoveRef(streamBackendId, pMeta->streamBackendRid); taosRemoveRef(streamBackendId, pMeta->streamBackendRid);