Merge branch '3.0' into feat/sangshuduo/TD-14141-update-taostools-for3.0

This commit is contained in:
Shuduo Sang 2022-08-04 10:26:31 +08:00
commit 0376e23b6e
21 changed files with 307 additions and 274 deletions

View File

@ -227,8 +227,7 @@ typedef struct SSubmitBlk {
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
int16_t padding; // TODO just for padding here
int32_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
@ -256,7 +255,7 @@ typedef struct {
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
int32_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk
int32_t numOfBlocks;
const void* pMsg;

View File

@ -76,7 +76,7 @@ void taos_cleanup(void) {
cleanupTaskQueue();
taosConvDestroy();
tscInfo("all local resources released");
taosCleanupCfg();
taosCloseLog();
@ -680,7 +680,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery;
if (pQuery->pRoot) {
pRequest->stmtType = pQuery->pRoot->type;
pRequest->stmtType = pQuery->pRoot->type;
}
}
@ -785,9 +785,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
if (NULL == pQuery->pRoot) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
}
}
@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob);
pCxt = NULL;
if (code == TSDB_CODE_SUCCESS) {
return;
}
@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
_error:
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId);
taosMemoryFree(pCxt);
terrno = code;
pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
@ -857,7 +860,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
}
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);

View File

@ -3144,10 +3144,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(pTableMeta->sversion);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htons(rows);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks = 1;
@ -3373,10 +3372,9 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(pSW->version);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htons(rows);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks++;

View File

@ -2028,11 +2028,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk->numOfRows = htonl(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
} else {

View File

@ -76,7 +76,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
pIter->sversion = htonl((*pPBlock)->sversion);
pIter->dataLen = htonl((*pPBlock)->dataLen);
pIter->schemaLen = htonl((*pPBlock)->schemaLen);
pIter->numOfRows = htons((*pPBlock)->numOfRows);
pIter->numOfRows = htonl((*pPBlock)->numOfRows);
}
return 0;
}

View File

@ -149,7 +149,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
pSnapshot->lastConfigIndex);

View File

@ -117,7 +117,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = htobe64(suid);

View File

@ -111,7 +111,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
// pBlock->sversion = htonl(pBlock->sversion);
// pBlock->dataLen = htonl(pBlock->dataLen);
// pBlock->schemaLen = htonl(pBlock->schemaLen);
// pBlock->numOfRows = htons(pBlock->numOfRows);
// pBlock->numOfRows = htonl(pBlock->numOfRows);
#if 0
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {

View File

@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);

View File

@ -395,9 +395,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
pBlk->uid = htobe64(tbUid);
pBlk->suid = htobe64(tbUid);
pBlk->sversion = htonl(schemaVer);
pBlk->padding = htonl(0);
pBlk->schemaLen = htonl(0);
pBlk->numOfRows = htons(mockRowNum);
pBlk->numOfRows = htonl(mockRowNum);
pBlk->dataLen = htonl(mockRowNum * mockRowLen);
for (uint32_t r = 0; r < mockRowNum; ++r) {
pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen);

File diff suppressed because it is too large Load Diff

View File

@ -225,7 +225,7 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
}
blkHead->dataLen = htonl(dataLen);
blkHead->numOfRows = htons(rows);
blkHead->numOfRows = htonl(rows);
ret->length += sizeof(SSubmitBlk) + dataLen;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);

View File

@ -686,7 +686,6 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->len = 0;
}
}
fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
}
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {

View File

@ -122,7 +122,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *
pBlocks->sversion = dataBuf->pTableMeta->sversion;
pBlocks->schemaLen = dataBuf->createTbReqLen;
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
pBlocks->numOfRows += numOfRows;

View File

@ -292,11 +292,10 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
int32_t schemaLen = blk->schemaLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk->numOfRows = htonl(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
}
}
@ -1267,7 +1266,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
dataBuf->numOfTables = 1;
@ -1339,7 +1338,7 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
dataBuf->numOfTables = 1;
@ -1986,7 +1985,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;
@ -2074,7 +2073,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
}
@ -2444,7 +2443,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;

View File

@ -5845,11 +5845,16 @@ static int32_t createTagValFromExpr(STranslateContext* pCxt, SDataType targetDt,
}
static int32_t createTagValFromVal(STranslateContext* pCxt, SDataType targetDt, SNode* pNode, SValueNode** pVal) {
*pVal = (SValueNode*)nodesCloneNode(pNode);
if (NULL == *pVal) {
SValueNode* pTempVal = (SValueNode*)nodesCloneNode(pNode);
if (NULL == pTempVal) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return DEAL_RES_ERROR == translateValueImpl(pCxt, *pVal, targetDt, true) ? pCxt->errCode : TSDB_CODE_SUCCESS;
if (DEAL_RES_ERROR == translateValueImpl(pCxt, pTempVal, targetDt, true)) {
nodesDestroyNode((SNode*)pTempVal);
return pCxt->errCode;
}
*pVal = pTempVal;
return TSDB_CODE_SUCCESS;
}
static int32_t createTagVal(STranslateContext* pCxt, uint8_t precision, SSchema* pSchema, SNode* pNode,

View File

@ -110,15 +110,15 @@ class InsertTest : public Test {
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
cout << "Block:" << i << endl;
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", padding:" << ntohl(blk->padding)
<< ", sversion:" << ntohl(blk->sversion) << ", dataLen:" << ntohl(blk->dataLen)
<< ", schemaLen:" << ntohl(blk->schemaLen) << ", numOfRows:" << ntohs(blk->numOfRows) << endl;
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion)
<< ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen)
<< ", numOfRows:" << ntohl(blk->numOfRows) << endl;
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}
}
void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) {
void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
@ -134,7 +134,7 @@ class InsertTest : public Test {
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
ASSERT_EQ(ntohs(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}

View File

@ -573,7 +573,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
walFsync(pWal, true);
walFsync(pWal, false);
// update match index
matchIndex = pMsg->prevLogIndex + pMsg->dataCount;
@ -694,7 +694,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
walFsync(pWal, true);
walFsync(pWal, false);
}
// prepare response msg

View File

@ -206,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SWal* pWal = pData->pWal;
SyncIndex index = 0;
SWalSyncInfo syncMeta;
SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
@ -444,7 +444,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SWal* pWal = pData->pWal;
SyncIndex index = 0;
SWalSyncInfo syncMeta;
SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;

View File

@ -132,8 +132,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
// SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
SyncIndex newNextIndex = nextIndex + 1;
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
// SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
@ -224,8 +225,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
// SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
SyncIndex newNextIndex = nextIndex + 1;
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
// SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64

View File

@ -0,0 +1,19 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400