Merge branch 'main' into test/update_push_message

This commit is contained in:
Ping Xiao 2023-05-06 13:53:52 +08:00
commit 5d643fcf91
45 changed files with 432 additions and 326 deletions

View File

@ -212,14 +212,6 @@ enum {
FETCH_TYPE__NONE,
};
typedef struct {
int8_t fetchType;
union {
SSDataBlock data;
void* meta;
};
} SFetchRet;
typedef struct SVarColAttr {
int32_t* offset; // start position for each entry in the list
uint32_t length; // used buffer size that contain the valid data

View File

@ -415,7 +415,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
return pSW;
}
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
static FORCE_INLINE void tDeleteSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
if (pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
@ -3421,10 +3421,10 @@ typedef struct {
char data[]; // SSubmitReq2
} SSubmitReq2Msg;
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySSubmitReq(SSubmitReq2* pReq, int32_t flag);
int32_t tEncodeSubmitReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq);
void tDestroySubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySubmitReq(SSubmitReq2* pReq, int32_t flag);
typedef struct {
int32_t affectedRows;

View File

@ -192,8 +192,6 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
void qStreamSetOpen(qTaskInfo_t tinfo);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);

View File

@ -340,6 +340,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SArray* pTaskList; // SArray<task_id*>
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;

View File

@ -147,8 +147,6 @@ typedef struct SWalReader {
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
// int8_t curInvalid;
// int8_t curStopped;
TdThreadMutex mutex;
SWalFilterCond cond;
// TODO remove it

View File

@ -261,6 +261,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_MND_INVALID_STABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x036D) // 2.x
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
#define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370)
// mnode-func

View File

@ -191,7 +191,7 @@ void taos_free_result(TAOS_RES *res) {
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->rsp.blockDataLen);
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
// taosx
taosArrayDestroy(pRsp->rsp.createTableLen);
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
@ -204,7 +204,7 @@ void taos_free_result(TAOS_RES *res) {
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->rsp.blockDataLen);
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp);

View File

@ -325,7 +325,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
@ -895,7 +895,7 @@ int stmtExec(TAOS_STMT* stmt) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else {
tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));

View File

@ -864,7 +864,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->dataRsp.blockDataLen);
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);
@ -877,7 +877,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
// taosx
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);

View File

@ -1053,9 +1053,9 @@ TEST(clientCase, sub_db_test) {
}
TEST(clientCase, sub_tb_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "/home/tests/dir/cfg/");
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("vm116", "root", "taosdata", NULL, 0);
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new();
@ -1091,7 +1091,7 @@ TEST(clientCase, sub_tb_test) {
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 25000;
int32_t timeout = 2500000;
int32_t count = 0;
@ -1117,10 +1117,10 @@ TEST(clientCase, sub_tb_test) {
fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes);
totalRows += 1;
if (totalRows % 100000 == 0) {
// if (totalRows % 100000 == 0) {
taos_print_row(buf, row, fields, numOfFields);
printf("row content: %s\n", buf);
}
// }
}
taos_free_result(pRes);

View File

@ -2374,7 +2374,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
@ -2388,7 +2388,7 @@ _end:
if (terrno != 0) {
*ppReq = NULL;
if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pReq);
}

View File

@ -1509,7 +1509,9 @@ void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid) {
// STSchema ========================================
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
if (pTSchema == NULL) return NULL;
if (pTSchema == NULL) {
return NULL;
}
pTSchema->numOfCols = numOfCols;
pTSchema->version = version;

View File

@ -7058,7 +7058,7 @@ void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL;
@ -7159,7 +7159,7 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL;
@ -7332,7 +7332,7 @@ _exit:
return 0;
}
int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) {
int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1;
@ -7344,7 +7344,7 @@ int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) {
return 0;
}
int32_t tDecodeSSubmitReq2(SDecoder *pCoder, SSubmitReq2 *pReq) {
int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) {
int32_t code = 0;
memset(pReq, 0, sizeof(*pReq));
@ -7387,7 +7387,7 @@ _exit:
return code;
}
void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
if (NULL == pTbData) {
return;
}
@ -7433,14 +7433,14 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
}
}
void tDestroySSubmitReq(SSubmitReq2 *pReq, int32_t flag) {
void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) {
if (pReq->aSubmitTbData == NULL) return;
int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData);
SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData);
for (int32_t i = 0; i < nSubmitTbData; i++) {
tDestroySSubmitTbData(&aSubmitTbData[i], flag);
tDestroySubmitTbData(&aSubmitTbData[i], flag);
}
taosArrayDestroy(pReq->aSubmitTbData);
pReq->aSubmitTbData = NULL;

View File

@ -50,6 +50,8 @@ void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *p
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup);
#ifdef __cplusplus
}
#endif

View File

@ -797,6 +797,11 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
return -1;
}
if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
SField *pField = taosArrayGet(pCreate->pColumns, i);
SSchema *pSchema = &pDst->pColumns[i];
@ -927,6 +932,11 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
return -1;
}
if(pDst->nextColId < 0 && pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
SField *pField = taosArrayGet(createReq->pColumns, i);
SSchema *pSchema = &pDst->pColumns[i];
@ -1154,6 +1164,11 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
return -1;
}
if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ntags){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
for (int32_t i = 0; i < ntags; i++) {
SField *pField = taosArrayGet(pFields, i);
if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
@ -1461,6 +1476,11 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return -1;
}
if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ncols){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
for (int32_t i = 0; i < ncols; i++) {
SField *pField = taosArrayGet(pFields, i);
if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {

View File

@ -197,7 +197,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebSub;
}
static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
const char *pSubKey = pOutput->pSub->key;
@ -339,7 +339,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// 2. check and get actual removed consumers, put their vg into pHash
doRemoveExistedConsumers(pOutput, pHash, pInput);
doRemoveLostConsumers(pOutput, pHash, pInput);
// 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
addUnassignedVgroups(pOutput, pHash);

View File

@ -2006,7 +2006,7 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
return 0;
}
static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1;
STrans *pTrans = NULL;
SSdbRaw *pRaw = NULL;

View File

@ -153,11 +153,15 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
if (code < 0) {
taosWUnLockLatch(&pSnode->pMeta->lock);
return -1;
}
taosWUnLockLatch(&pSnode->pMeta->lock);
// 3.go through recover steps to fill history
if (pTask->fillHistory) {
streamSetParamForRecover(pTask);

View File

@ -231,7 +231,7 @@ typedef struct SSnapContext {
} SSnapContext;
typedef struct STqReader {
SPackedData msg2;
SPackedData msg;
SSubmitReq2 submit;
int32_t nextBlk;
int64_t lastBlkUid;
@ -241,8 +241,9 @@ typedef struct STqReader {
SArray *pColIdList; // SArray<int16_t>
int32_t cachedSchemaVer;
int64_t cachedSchemaSuid;
int64_t cachedSchemaUid;
SSchemaWrapper *pSchemaWrapper;
STSchema *pSchema;
SSDataBlock *pResBlock;
} STqReader;
STqReader *tqReaderOpen(SVnode *pVnode);
@ -255,6 +256,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock);
int32_t tqNextBlockInWal(STqReader* pReader);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);

View File

@ -193,7 +193,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.

View File

@ -639,7 +639,6 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL;
SSchema *pSchema = NULL;
pSW = metaGetTableSchema(pMeta, uid, sver, lock);
if (!pSW) return NULL;

View File

@ -217,8 +217,8 @@ typedef struct STableInfoForChildTable {
static void destroySTableInfoForChildTable(void* data) {
STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
taosMemoryFree(pData->tableName);
tDeleteSSchemaWrapper(pData->schemaRow);
tDeleteSSchemaWrapper(pData->tagRow);
tDeleteSchemaWrapper(pData->schemaRow);
tDeleteSchemaWrapper(pData->tagRow);
}
static void MoveToSnapShotVersion(SSnapContext* ctx) {

View File

@ -673,8 +673,8 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
metaUpdateUidIdx(pMeta, &nStbEntry);
metaULock(pMeta);
tDeleteSSchemaWrapper(tag);
tDeleteSSchemaWrapper(row);
tDeleteSchemaWrapper(tag);
tDeleteSchemaWrapper(row);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
tDecoderClear(&dc);

View File

@ -684,7 +684,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8
" failed since %s",
@ -696,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
}

View File

@ -299,7 +299,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
}
SRow *pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
taosArrayPush(tbData.aRowP, &pRow);
@ -309,7 +309,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
}
// encode
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
tEncodeSize(tEncodeSubmitReq, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
@ -321,7 +321,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
if (tEncodeSubmitReq(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
/*vError("failed to encode submit req since %s", terrstr());*/
}
@ -332,7 +332,7 @@ _end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}

View File

@ -781,13 +781,17 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
tDecoderClear(&decoder);
// 2.save task, use the newest commit version as the initial start version of stream task.
taosWLockLatch(&pTq->pStreamMeta->lock);
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
streamMetaGetNumOfTasks(pTq->pStreamMeta));
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1;
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
// 3.go through recover steps to fill history
if (pTask->fillHistory) {
streamTaskCheckDownstream(pTask, sversion);
@ -1323,13 +1327,12 @@ FAIL:
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exists", vgId);
taosWUnLockLatch(&pTq->pStreamMeta->lock);

View File

@ -262,14 +262,15 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
// taosWUnLockLatch(&pTq->lock);
}
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks));
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
// push data for stream processing:
// 1. the vnode has already been restored.
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
if (numOfTasks == 0) {
return 0;
}
@ -287,7 +288,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
}
int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) {
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pHandle = (STqHandle*) handle;
if(pHandle->msg == NULL){

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tmsg.h"
#include "tq.h"
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
@ -265,9 +266,9 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
pReader->pColIdList = NULL;
pReader->cachedSchemaVer = 0;
pReader->cachedSchemaSuid = 0;
pReader->pSchema = NULL;
pReader->pSchemaWrapper = NULL;
pReader->tbIdHash = NULL;
pReader->pResBlock = createDataBlock();
return pReader;
}
@ -276,19 +277,19 @@ void tqCloseReader(STqReader* pReader) {
if (pReader->pWalReader) {
walCloseReader(pReader->pWalReader);
}
// free cached schema
if (pReader->pSchema) {
taosMemoryFree(pReader->pSchema);
}
if (pReader->pSchemaWrapper) {
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
}
if (pReader->pColIdList) {
taosArrayDestroy(pReader->pColIdList);
}
// free hash
blockDataDestroy(pReader->pResBlock);
taosHashCleanup(pReader->tbIdHash);
tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
taosMemoryFree(pReader);
}
@ -322,9 +323,85 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) {
return 0;
}
// todo ignore the error in wal?
int32_t tqNextBlockInWal(STqReader* pReader) {
SWalReader* pWalReader = pReader->pWalReader;
while(1) {
SArray* pBlockList = pReader->submit.aSubmitTbData;
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
// try next message in wal file
if (walNextValidMsg(pWalReader) < 0) {
return FETCH_TYPE__NONE;
}
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pWalReader->pHead->head.version;
SDecoder decoder = {0};
tDecoderInit(&decoder, pBody, bodyLen);
{
int32_t nSubmitTbData = taosArrayGetSize(pReader->submit.aSubmitTbData);
for (int32_t i = 0; i < nSubmitTbData; i++) {
SSubmitTbData* pData = taosArrayGet(pReader->submit.aSubmitTbData, i);
if (pData->pCreateTbReq != NULL) {
taosArrayDestroy(pData->pCreateTbReq->ctb.tagName);
taosMemoryFreeClear(pData->pCreateTbReq);
}
pData->aRowP = taosArrayDestroy(pData->aRowP);
}
pReader->submit.aSubmitTbData = taosArrayDestroy(pReader->submit.aSubmitTbData);
}
if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
tDecoderClear(&decoder);
tqError("decode wal file error, msgLen:%d, ver:%"PRId64, bodyLen, ver);
return FETCH_TYPE__NONE;
}
tDecoderClear(&decoder);
pReader->nextBlk = 0;
}
size_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg.msgStr, pReader->msg.msgLen,
pReader->msg.ver, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) {
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
return FETCH_TYPE__DATA;
}
}
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("tq reader return submit block, uid:%"PRId64", ver:%"PRId64, pSubmitTbData->uid, pReader->msg.ver);
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
return FETCH_TYPE__DATA;
}
} else {
pReader->nextBlk += 1;
tqDebug("tq reader discard submit block, uid:%"PRId64", continue", pSubmitTbData->uid);
}
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->msg.msgStr = NULL;
}
}
int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) {
while (1) {
if (pReader->msg2.msgStr == NULL) {
if (pReader->msg.msgStr == NULL) {
if (walNextValidMsg(pReader->pWalReader) < 0) {
return FETCH_TYPE__NONE;
}
@ -337,8 +414,7 @@ int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) {
}
while (tqNextBlockImpl(pReader)) {
memset(pBlock, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock(pBlock, pReader, NULL);
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) {
continue;
}
@ -349,31 +425,33 @@ int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) {
}
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
pReader->msg2.msgStr = msgStr;
pReader->msg2.msgLen = msgLen;
pReader->msg2.ver = ver;
pReader->msg.msgStr = msgStr;
pReader->msg.msgLen = msgLen;
pReader->msg.ver = ver;
tqDebug("tq reader set msg %p %d", msgStr, msgLen);
SDecoder decoder;
tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen);
if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) {
tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
tDecoderClear(&decoder);
tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%"PRId64, msgLen, ver);
return -1;
}
tDecoderClear(&decoder);
return 0;
}
bool tqNextBlockImpl(STqReader* pReader) {
if (pReader->msg2.msgStr == NULL) {
if (pReader->msg.msgStr == NULL) {
return false;
}
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) {
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
pReader->msg2.ver, pReader->nextBlk);
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg.msgStr, pReader->msg.msgLen,
pReader->msg.ver, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) {
@ -382,7 +460,7 @@ bool tqNextBlockImpl(STqReader* pReader) {
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("tq reader block found, ver:%"PRId64", uid:%"PRId64, pReader->msg2.ver, pSubmitTbData->uid);
tqDebug("tq reader block found, ver:%"PRId64", uid:%"PRId64, pReader->msg.ver, pSubmitTbData->uid);
return true;
} else {
tqDebug("tq reader discard submit block, uid:%"PRId64", continue", pSubmitTbData->uid);
@ -391,15 +469,15 @@ bool tqNextBlockImpl(STqReader* pReader) {
pReader->nextBlk++;
}
tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL;
pReader->msg.msgStr = NULL;
return false;
}
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
if (pReader->msg2.msgStr == NULL) return false;
if (pReader->msg.msgStr == NULL) return false;
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) {
@ -413,9 +491,9 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
pReader->nextBlk++;
}
tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL;
pReader->msg.msgStr = NULL;
return false;
}
@ -450,131 +528,182 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
}
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg2.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
if (pSubmitTbDataRet) {
*pSubmitTbDataRet = pSubmitTbData;
}
blockDataCleanup(pBlock);
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;
pBlock->info.id.uid = uid;
pBlock->info.version = pReader->msg2.ver;
pBlock->info.version = pReader->msg.ver;
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || pReader->cachedSchemaSuid != suid) {
taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) || (pReader->cachedSchemaVer != sversion)) {
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, suid, uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
STSchema* pTschema = pReader->pSchema;
pReader->cachedSchemaUid = uid;
pReader->cachedSchemaSuid = suid;
pReader->cachedSchemaVer = sversion;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
if (blockDataGetNumOfCols(pBlock) > 0) {
blockDataDestroy(pReader->pResBlock);
pReader->pResBlock = createDataBlock();
pBlock = pReader->pResBlock;
int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
pBlock->info.id.uid = uid;
pBlock->info.version = pReader->msg.ver;
}
if (colNumNeed == 0) {
int32_t colMeta = 0;
while (colMeta < pSchemaWrapper->nCols) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
int32_t numOfCols = taosArrayGetSize(pReader->pColIdList);
if (numOfCols == 0) { // all columns are required
for (int32_t i = 0; i < pSchemaWrapper->nCols; ++i) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[i];
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
blockDataFreeRes(pBlock);
return -1;
}
colMeta++;
}
} else {
if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols;
if (numOfCols > pSchemaWrapper->nCols) {
numOfCols = pSchemaWrapper->nCols;
}
int32_t colMeta = 0;
int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
int32_t i = 0;
int32_t j = 0;
while (i < pSchemaWrapper->nCols && j < numOfCols) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[i];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
if (colIdSchema < colIdNeed) {
colMeta++;
i++;
} else if (colIdSchema > colIdNeed) {
colNeed++;
j++;
} else {
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}
colMeta++;
colNeed++;
i++;
j++;
}
}
}
}
int32_t numOfRows = 0;
int32_t numOfRows = 0;
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
numOfRows = pCol->nVal;
} else {
numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
}
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
SColData* pCol = taosArrayGet(pCols, 0);
numOfRows = pCol->nVal;
} else {
SArray* pRows = pSubmitTbData->aRowP;
numOfRows = taosArrayGetSize(pRows);
if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pBlock->info.rows = numOfRows;
int32_t colActual = blockDataGetNumOfCols(pBlock);
// convert and scan one block
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
int32_t numOfCols = taosArrayGetSize(pCols);
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
while (targetIdx < colActual) {
if (sourceIdx >= numOfCols) {
tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
goto FAIL;
}
SColData* pCol = taosArrayGet(pCols, sourceIdx);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal;
if (pCol->nVal != numOfRows) {
tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
goto FAIL;
}
if (pCol->cid < pColData->info.colId) {
sourceIdx++;
} else if (pCol->cid == pColData->info.colId) {
for (int32_t i = 0; i < pCol->nVal; i++) {
tColDataGetValue(pCol, i, &colVal);
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2] = {0};
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
} else {
colDataSetNULL(pColData, i);
}
} else {
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}
}
sourceIdx++;
targetIdx++;
} else {
for (int32_t i = 0; i < pCol->nVal; i++) {
colDataSetNULL(pColData, i);
}
targetIdx++;
}
}
} else {
SArray* pRows = pSubmitTbData->aRowP;
SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
STSchema* pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pBlock->info.rows = numOfRows;
int32_t colActual = blockDataGetNumOfCols(pBlock);
// convert and scan one block
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
int32_t numOfCols = taosArrayGetSize(pCols);
int32_t targetIdx = 0;
for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGetP(pRows, i);
int32_t sourceIdx = 0;
while (targetIdx < colActual) {
if(sourceIdx >= numOfCols){
tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
goto FAIL;
}
SColData* pCol = taosArrayGet(pCols, sourceIdx);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal;
if(pCol->nVal != numOfRows){
tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
goto FAIL;
}
for (int32_t j = 0; j < colActual; j++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
while (1) {
SColVal colVal;
tqDebug("start to extract column id:%d, index:%d", pColData->info.colId, sourceIdx);
if (pCol->cid < pColData->info.colId) {
sourceIdx++;
} else if (pCol->cid == pColData->info.colId) {
for (int32_t i = 0; i < pCol->nVal; i++) {
tColDataGetValue(pCol, i, &colVal);
tRowGet(pRow, pTSchema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) {
tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d",
sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
sourceIdx++;
continue;
} else if (colVal.cid == pColData->info.colId) {
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2] = {0};
@ -591,59 +720,18 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
goto FAIL;
}
}
}
sourceIdx++;
targetIdx++;
} else {
for (int32_t i = 0; i < pCol->nVal; i++) {
sourceIdx++;
break;
} else {
colDataSetNULL(pColData, i);
}
targetIdx++;
}
}
} else {
SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGetP(pRows, i);
int32_t sourceIdx = 0;
for (int32_t j = 0; j < colActual; j++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
while (1) {
SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) {
sourceIdx++;
continue;
} else if (colVal.cid == pColData->info.colId) {
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2] = {0};
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
} else {
colDataSetNULL(pColData, i);
}
} else {
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}
sourceIdx++;
break;
} else {
colDataSetNULL(pColData, i);
break;
}
break;
}
}
}
}
taosMemoryFreeClear(pTSchema);
}
return 0;
@ -654,7 +742,7 @@ FAIL:
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk);
tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
@ -665,18 +753,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;
taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
@ -686,7 +763,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
return -1;
}
STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t numOfRows = 0;
@ -743,18 +819,18 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
tDeleteSchemaWrapper(pSW);
goto FAIL;
}
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
block.info.id.uid = uid;
block.info.version = pReader->msg2.ver;
block.info.version = pReader->msg.ver;
if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
tDeleteSchemaWrapper(pSW);
goto FAIL;
}
taosArrayPush(blocks, &block);
@ -803,14 +879,17 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
curRow++;
}
} else {
SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
STSchema* pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGetP(pRows, i);
bool buildNew = false;
for (int32_t j = 0; j < pTschema->numOfCols; j++) {
for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
SColVal colVal;
tRowGet(pRow, pTschema, j, &colVal);
tRowGet(pRow, pTSchema, j, &colVal);
if (curRow == 0) {
assigned[j] = !COL_VAL_IS_NONE(&colVal);
buildNew = true;
@ -839,18 +918,18 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
tDeleteSchemaWrapper(pSW);
goto FAIL;
}
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
block.info.id.uid = uid;
block.info.version = pReader->msg2.ver;
block.info.version = pReader->msg.ver;
if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
tDeleteSchemaWrapper(pSW);
goto FAIL;
}
taosArrayPush(blocks, &block);
@ -868,7 +947,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
while (targetIdx < colActual) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal);
tRowGet(pRow, pTSchema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) {
sourceIdx++;
@ -895,6 +974,8 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
}
curRow++;
}
taosMemoryFreeClear(pTSchema);
}
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);

View File

@ -57,42 +57,28 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
return 0;
}
static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t));
void* pIter = NULL;
taosWLockLatch(&pStreamMeta->lock);
while(1) {
pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
taosArrayPush(pTaskIdList, &pTask->id.taskId);
}
taosWUnLockLatch(&pStreamMeta->lock);
return pTaskIdList;
}
int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true;
bool noNewDataInWal = true;
int32_t vgId = pStreamMeta->vgId;
int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks);
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
taosWLockLatch(&pStreamMeta->lock);
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
taosWUnLockLatch(&pStreamMeta->lock);
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);
// update the new task number
numOfTasks = taosArrayGetSize(pTaskIdList);
numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskIdList, i);
int32_t* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
if (pTask == NULL) {
continue;
@ -166,7 +152,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true;
}
taosArrayDestroy(pTaskIdList);
taosArrayDestroy(pTaskList);
return 0;
}

View File

@ -215,7 +215,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;
@ -274,7 +274,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;

View File

@ -672,7 +672,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
@ -681,7 +681,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SSubmitReq2 submitReq = {0};
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
@ -690,28 +690,28 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
// encode
int32_t len;
int32_t code;
tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code);
tEncodeSize(tEncodeSubmitReq, &submitReq, len, code);
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
goto _end;
}
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) {
if (tEncodeSubmitReq(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
tEncoderClear(&encoder);
rpcFreeCont(pBuf);
tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
continue;
}
tEncoderClear(&encoder);
tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,

View File

@ -180,7 +180,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
// lock
taosWLockLatch(&pTq->lock);
code = tqRegisterPushEntry(pTq, pHandle, pMsg);
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
return code;

View File

@ -1007,7 +1007,7 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
}
tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);
tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
if (NULL == pCxt->pTbData) {
pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (NULL == pCxt->pTbData) {
@ -1039,7 +1039,7 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
taosMemoryFreeClear(pCxt->pTbSchema);
tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pCxt->pTbData);
taosArrayDestroy(pCxt->pColValues);
}
@ -1149,7 +1149,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
int32_t code = TSDB_CODE_SUCCESS;
char *pMsg = NULL;
uint32_t msglen = 0;
tEncodeSize(tEncodeSSubmitReq2, pSubmitReq, msglen, code);
tEncodeSize(tEncodeSubmitReq, pSubmitReq, msglen, code);
if (TSDB_CODE_SUCCESS == code) {
pMsg = taosMemoryMalloc(msglen);
if (NULL == pMsg) {
@ -1159,7 +1159,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder;
tEncoderInit(&encoder, pMsg, msglen);
code = tEncodeSSubmitReq2(&encoder, pSubmitReq);
code = tEncodeSubmitReq(&encoder, pSubmitReq);
tEncoderClear(&encoder);
}
if (TSDB_CODE_SUCCESS == code) {
@ -1199,7 +1199,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
len -= sizeof(SSubmitReq2Msg);
SDecoder dc = {0};
tDecoderInit(&dc, pReq, len);
if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) {
if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
@ -1388,7 +1388,7 @@ _exit:
// clear
taosArrayDestroy(newTbUids);
tDestroySSubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
tDestroySubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
if (code) terrno = code;

View File

@ -126,7 +126,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
int32_t code = TSDB_CODE_SUCCESS;
int32_t len = 0;
void* pBuf = NULL;
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
tEncodeSize(tEncodeSubmitReq, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
@ -138,7 +138,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
code = tEncodeSSubmitReq2(&encoder, pReq);
code = tEncodeSubmitReq(&encoder, pReq);
tEncoderClear(&encoder);
}
@ -281,7 +281,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
taosArrayPush(tbData.aRowP, &pRow);
@ -301,7 +301,7 @@ _end:
if (terrno != 0) {
*ppReq = NULL;
if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
return terrno;
@ -326,7 +326,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid);
if (code) {
if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
@ -335,7 +335,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
}
code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
return code;

View File

@ -1052,19 +1052,6 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return TSDB_CODE_SUCCESS;
}
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if ((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)) {
qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
qDebug("set the submit block for future scan");
pTaskInfo->streamInfo.submit = submit;
return 0;
}
void qStreamSetOpen(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
@ -1086,6 +1073,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (pOperator == NULL) {
return -1;
}
SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base;
@ -1221,7 +1209,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema;
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);

View File

@ -109,8 +109,8 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo) {
taosMemoryFreeClear(pSchemaInfo->dbname);
taosMemoryFreeClear(pSchemaInfo->tablename);
tDeleteSSchemaWrapper(pSchemaInfo->sw);
tDeleteSSchemaWrapper(pSchemaInfo->qsw);
tDeleteSchemaWrapper(pSchemaInfo->sw);
tDeleteSchemaWrapper(pSchemaInfo->qsw);
}
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo) {
@ -197,7 +197,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
return pqSw;
}
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSchemaWrapper(pStreamInfo->schema); }
static void freeBlock(void* pParam) {
SSDataBlock* pBlock = *(SSDataBlock**)pParam;

View File

@ -1623,7 +1623,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock);
// blockDataFreeRes((SSDataBlock*)pBlock);
calBlockTbName(pInfo, pInfo->pRes);
return 0;
@ -1637,7 +1637,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("start to exec queue scan, %s", id);
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) {
if (pInfo->tqReader->msg.msgStr == NULL) {
SPackedData submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
@ -1649,21 +1649,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextBlockImpl(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, &block, true);
setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, true);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
}
}
pInfo->tqReader->msg2 = (SPackedData){0};
pInfo->tqReader->msg = (SPackedData){0};
pTaskInfo->streamInfo.submit = (SPackedData){0};
return NULL;
}
@ -1689,20 +1687,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) {
SSDataBlock block = {0};
int32_t type = tqNextBlock(pInfo->tqReader, &block);
int32_t type = tqNextBlockInWal(pInfo->tqReader);
SSDataBlock* pRes = pInfo->tqReader->pResBlock;
// curVersion move to next, so currentOffset = curVersion - 1
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);
if (type == FETCH_TYPE__DATA) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, block.info.rows,
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, &block, true);
setBlockIntoRes(pInfo, pRes, true);
if (pInfo->pRes->info.rows > 0) {
qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);
return pInfo->pRes;
}
} else if (type == FETCH_TYPE__NONE) {
@ -2055,7 +2051,7 @@ FETCH_NEXT_BLOCK:
NEXT_SUBMIT_BLK:
while (1) {
if (pInfo->tqReader->msg2.msgStr == NULL) {
if (pInfo->tqReader->msg.msgStr == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
@ -2075,14 +2071,12 @@ FETCH_NEXT_BLOCK:
blockDataCleanup(pInfo->pRes);
while (tqNextBlockImpl(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, &block, false);
setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false);
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
pInfo->pRes->info.version)) {
@ -2191,7 +2185,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
}
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
tDeleteSSchemaWrapper(mtInfo.schema);
tDeleteSchemaWrapper(mtInfo.schema);
return NULL;
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
SSnapContext* sContext = pInfo->sContext;

View File

@ -313,7 +313,7 @@ void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
if (pTableCxt->pData) {
tDestroySSubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pTableCxt->pData);
}
taosMemoryFree(pTableCxt);
@ -324,7 +324,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
return;
}
tDestroySSubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pVgCxt->pData);
taosMemoryFree(pVgCxt);
}
@ -499,7 +499,7 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin
int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0;
void* pBuf = NULL;
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
tEncodeSize(tEncodeSubmitReq, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
@ -511,7 +511,7 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
code = tEncodeSSubmitReq2(&encoder, pReq);
code = tEncodeSubmitReq(&encoder, pReq);
tEncoderClear(&encoder);
}

View File

@ -57,6 +57,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
// task list
pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t));
if (pMeta->pTaskList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (streamMetaBegin(pMeta) < 0) {
goto _err;
}
@ -70,6 +77,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
_err:
taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
@ -100,6 +108,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup(pMeta->pTasks);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta);
}
@ -180,11 +189,15 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
}
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
return 0;
}
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
return (int32_t) taosHashGetSize(pMeta->pTasks);
size_t size = taosHashGetSize(pMeta->pTasks);
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
return (int32_t) size;
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
@ -224,6 +237,15 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
for(int32_t i = 0; i < num; ++i) {
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (*pTaskId == taskId) {
taosArrayRemove(pMeta->pTaskList, i);
break;
}
}
streamMetaReleaseTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
}
@ -308,6 +330,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
if (pTask->fillHistory) {
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
streamTaskCheckDownstream(pTask, ver);

View File

@ -193,7 +193,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);
}

View File

@ -1814,6 +1814,11 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
*ppVal = pVal;
*vLen = cd.vLen;
} else {
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
tdbTrace("tdb/btree-next2 decoder: %p pVal free: %p", &cd, cd.pVal);
tdbFree(cd.pVal);
}
}
ret = tdbBtcMoveToNext(pBtc);

View File

@ -203,6 +203,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow")
// mnode-func
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_NAME, "Invalid func name")

View File

@ -353,7 +353,8 @@ def main():
else:
core_dir = "none"
text = f'''exit status: {msg_dict[status]}
text = f'''
exit status: {msg_dict[status]}
test scope: crash_gen
owner: pxiao
hostname: {hostname}
@ -362,7 +363,8 @@ def main():
git commit : {git_commit}
log dir: {log_dir}
core dir: {core_dir}
cmd: {cmd}'''
cmd: {cmd}
'''
send_msg(get_msg(text))
except Exception as e:

View File

@ -388,7 +388,8 @@ def main():
else:
core_dir = "none"
text = f'''exit status: {msg_dict[status]}
text = f'''
exit status: {msg_dict[status]}
test scope: crash_gen
owner: pxiao
hostname: {hostname}

View File

@ -388,7 +388,8 @@ def main():
else:
core_dir = "none"
text = f'''exit status: {msg_dict[status]}
text = f'''
exit status: {msg_dict[status]}
test scope: crash_gen
owner: pxiao
hostname: {hostname}