remove logs
This commit is contained in:
parent
a70b4e28c8
commit
2ddd07142a
|
@ -1233,7 +1233,6 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
rc = metaDropTableByUid(pMeta, uid, &type, &suid, &sysTbl);
|
rc = metaDropTableByUid(pMeta, uid, &type, &suid, &sysTbl);
|
||||||
metaInfo("wjm meta drop table by uid: %"PRId64, uid);
|
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
if (rc < 0) goto _exit;
|
if (rc < 0) goto _exit;
|
||||||
|
|
|
@ -53,7 +53,7 @@ static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
|
||||||
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
|
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
|
||||||
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
|
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
|
||||||
int64_t earlyTs);
|
int64_t earlyTs);
|
||||||
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName, int64_t uid);
|
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
|
||||||
|
|
||||||
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
const char* pIdStr, bool newSubTableRule) {
|
const char* pIdStr, bool newSubTableRule) {
|
||||||
|
@ -442,7 +442,7 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
|
||||||
SVDropTbBatchReq batchReq = {0};
|
SVDropTbBatchReq batchReq = {0};
|
||||||
SVDropTbReq req = {0};
|
SVDropTbReq req = {0};
|
||||||
|
|
||||||
if (rows <= 0 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
|
if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
|
batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
|
||||||
if (!batchReq.pArray) return terrno;
|
if (!batchReq.pArray) return terrno;
|
||||||
|
@ -451,15 +451,12 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
|
||||||
req.igNotExists = true;
|
req.igNotExists = true;
|
||||||
|
|
||||||
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||||
SColumnInfoData* pUidCol = taosArrayGet(pDataBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
||||||
char tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
|
char tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
|
||||||
for (int32_t i = 0; i < rows; ++i) {
|
for (int32_t i = 0; i < rows; ++i) {
|
||||||
void* pData = colDataGetVarData(pTbNameCol, i);
|
void* pData = colDataGetVarData(pTbNameCol, i);
|
||||||
memcpy(tbName, varDataVal(pData), varDataLen(pData));
|
memcpy(tbName, varDataVal(pData), varDataLen(pData));
|
||||||
tbName[varDataLen(pData) + 1] = 0;
|
tbName[varDataLen(pData) + 1] = 0;
|
||||||
req.name = tbName;
|
req.name = tbName;
|
||||||
// TODO wjm remove uid, it's not my uid
|
|
||||||
req.uid = *(int64_t*)colDataGetData(pUidCol, i);
|
|
||||||
if (taosArrayPush(batchReq.pArray, &req) == NULL) {
|
if (taosArrayPush(batchReq.pArray, &req) == NULL) {
|
||||||
TSDB_CHECK_CODE(terrno, lino, _exit);
|
TSDB_CHECK_CODE(terrno, lino, _exit);
|
||||||
}
|
}
|
||||||
|
@ -467,9 +464,9 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
|
||||||
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
|
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
|
||||||
// TODO wjm handle only one table
|
// only one row
|
||||||
code = metaGetTableEntryByName(&mr, tbName);
|
code = metaGetTableEntryByName(&mr, tbName);
|
||||||
if (isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
|
if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
|
||||||
STableSinkInfo* pTableSinkInfo = NULL;
|
STableSinkInfo* pTableSinkInfo = NULL;
|
||||||
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
|
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
|
||||||
if (alreadyCached) {
|
if (alreadyCached) {
|
||||||
|
@ -485,9 +482,7 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
|
||||||
void* pData = colDataGetVarData(pTbNameCol, i);
|
void* pData = colDataGetVarData(pTbNameCol, i);
|
||||||
memcpy(tbName, varDataVal(pData), varDataLen(pData));
|
memcpy(tbName, varDataVal(pData), varDataLen(pData));
|
||||||
tbName[varDataLen(pData) + 1] = 0;
|
tbName[varDataLen(pData) + 1] = 0;
|
||||||
int64_t uid = *(int64_t*)colDataGetData(pUidCol, i);
|
code = doWaitForDstTableDropped(pVnode, pTask, tbName);
|
||||||
// TODO wjm remove uid it's not my uid
|
|
||||||
code = doWaitForDstTableDropped(pVnode, pTask, tbName, uid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -906,7 +901,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName, int64_t uid) {
|
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
|
||||||
int32_t vgId = TD_VID(pVnode);
|
int32_t vgId = TD_VID(pVnode);
|
||||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
@ -920,7 +915,6 @@ static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, cons
|
||||||
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
|
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
|
||||||
int32_t code = metaGetTableEntryByName(&mr, dstTableName);
|
int32_t code = metaGetTableEntryByName(&mr, dstTableName);
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
tqDebug("wjm s-task:%s table:%s has been dropped", id, dstTableName);
|
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
break;
|
break;
|
||||||
} else if (TSDB_CODE_SUCCESS == code) {
|
} else if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -929,7 +923,6 @@ static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, cons
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
|
tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("wjm s-task:%s table:%s exist, but not mine", id, dstTableName);
|
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -483,75 +483,6 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodePreProcessDropTSmaCtbMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|
||||||
SVDropTbBatchReq dropReq = {0};
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
SDecoder dc = {0};
|
|
||||||
SEncoder ec = {0};
|
|
||||||
int32_t nTbs = 0;
|
|
||||||
SDeleteRes res = {0};
|
|
||||||
int32_t size = 0;
|
|
||||||
uint8_t *pCont = NULL;
|
|
||||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
|
||||||
if (tDecodeSVDropTbBatchReq(&dc, &dropReq) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
nTbs = dropReq.nReqs;
|
|
||||||
res.skey = INT64_MIN;
|
|
||||||
res.ekey = INT64_MAX;
|
|
||||||
res.affectedRows = 1;
|
|
||||||
res.uidList = taosArrayInit(nTbs, sizeof(tb_uid_t));
|
|
||||||
if (!res.uidList) {
|
|
||||||
code = terrno;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
vDebug("vnode preprocess drop tsma ctb, vgId:%d tb num: %d", TD_VID(pVnode), nTbs);
|
|
||||||
for (int32_t i = 0; i < nTbs; ++i) {
|
|
||||||
SVDeleteRsp rsp = {.affectedRows = 1};
|
|
||||||
tb_uid_t uid = metaGetTableEntryUidByName(pVnode->pMeta, dropReq.pReqs[i].name);
|
|
||||||
if (uid == 0) {
|
|
||||||
vWarn("vgId:%d, drop tsma ctb:%s not found", TD_VID(pVnode), dropReq.pReqs[i].name);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (NULL == taosArrayPush(res.uidList, &uid)) {
|
|
||||||
code = terrno;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tEncodeSize(tEncodeDeleteRes, &res, size, code);
|
|
||||||
pCont = rpcMallocCont(size + sizeof(SMsgHead));
|
|
||||||
if (!pCont) {
|
|
||||||
code = terrno;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
|
|
||||||
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
|
||||||
|
|
||||||
tEncoderInit(&ec, pCont + sizeof(SMsgHead), size);
|
|
||||||
code = tEncodeDeleteRes(&ec, &res);
|
|
||||||
tEncoderClear(&ec);
|
|
||||||
if (code != 0) {
|
|
||||||
vError("vgId:%d %s failed to encode delete response", TD_VID(pVnode), __func__);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = pCont;
|
|
||||||
pCont = NULL;
|
|
||||||
pMsg->contLen = size + sizeof(SMsgHead);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (res.uidList) {
|
|
||||||
taosArrayDestroy(res.uidList);
|
|
||||||
}
|
|
||||||
tDecoderClear(&dc);
|
|
||||||
rpcFreeCont(pCont);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vnodePreProcessDropTbMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
int32_t vnodePreProcessDropTbMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1294,11 +1225,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("wjm process create tb req:%s, uid: %"PRId64, pCreateReq->name, pCreateReq->uid);
|
|
||||||
// do create table
|
// do create table
|
||||||
if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
|
if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
|
||||||
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||||
vInfo("wjm already exists-----------------");
|
|
||||||
cRsp.code = TSDB_CODE_SUCCESS;
|
cRsp.code = TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
cRsp.code = terrno;
|
cRsp.code = terrno;
|
||||||
|
@ -1375,7 +1304,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
vInfo("wjm process create table request exit");
|
|
||||||
tDeleteSVCreateTbBatchReq(&req);
|
tDeleteSVCreateTbBatchReq(&req);
|
||||||
taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
|
taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
|
||||||
taosArrayDestroy(tbUids);
|
taosArrayDestroy(tbUids);
|
||||||
|
|
|
@ -1326,7 +1326,6 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
||||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||||
code = pAPI->streamStateGetParName(pState, groupId, &pValue, true, &winCode);
|
code = pAPI->streamStateGetParName(pState, groupId, &pValue, true, &winCode);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
qInfo("wjm group id: %"PRId64 " winCode: %d, block type: %d", groupId, winCode, pSrcBlock->info.type);
|
|
||||||
if (winCode != TSDB_CODE_SUCCESS) {
|
if (winCode != TSDB_CODE_SUCCESS) {
|
||||||
SSDataBlock* pTmpBlock = NULL;
|
SSDataBlock* pTmpBlock = NULL;
|
||||||
code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock);
|
code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock);
|
||||||
|
@ -1335,8 +1334,6 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
||||||
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
pTmpBlock->info.id.groupId = groupId;
|
pTmpBlock->info.id.groupId = groupId;
|
||||||
char* tbName = pSrcBlock->info.parTbName;
|
char* tbName = pSrcBlock->info.parTbName;
|
||||||
printSpecDataBlock(pSrcBlock, "wjm", "recv", "wjm");
|
|
||||||
printSpecDataBlock(pTmpBlock, "wjm", "recv", "wjm");
|
|
||||||
if (pTableSup->numOfExprs > 0) {
|
if (pTableSup->numOfExprs > 0) {
|
||||||
code = projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs,
|
code = projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs,
|
||||||
NULL);
|
NULL);
|
||||||
|
@ -1344,19 +1341,15 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
||||||
|
|
||||||
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||||
QUERY_CHECK_NULL(pTbCol, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pTbCol, code, lino, _end, terrno);
|
||||||
printSpecDataBlock(pSrcBlock, "wjm", "recv", "wjm");
|
|
||||||
printSpecDataBlock(pTmpBlock, "wjm", "recv", "wjm");
|
|
||||||
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
|
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
|
if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
|
||||||
qInfo("wjm calculated tbnameis null");
|
|
||||||
len = 1;
|
len = 1;
|
||||||
tbName[0] = 0;
|
tbName[0] = 0;
|
||||||
} else {
|
} else {
|
||||||
void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
|
void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
|
||||||
len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||||
memcpy(tbName, varDataVal(pData), len);
|
memcpy(tbName, varDataVal(pData), len);
|
||||||
qInfo("wjm calculated tbname: %s", tbName);
|
|
||||||
code = pAPI->streamStatePutParName(pState, groupId, tbName);
|
code = pAPI->streamStatePutParName(pState, groupId, tbName);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3542,11 +3542,10 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32
|
||||||
int64_t* gpIdCol = (int64_t*)pGpIdCol->pData;
|
int64_t* gpIdCol = (int64_t*)pGpIdCol->pData;
|
||||||
void* pParName = NULL;
|
void* pParName = NULL;
|
||||||
int32_t winCode = 0;
|
int32_t winCode = 0;
|
||||||
// TODO wjm test remove non stream child tables
|
|
||||||
code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i],
|
code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i],
|
||||||
&pParName, false, &winCode);
|
&pParName, false, &winCode);
|
||||||
if (TSDB_CODE_SUCCESS == code && winCode != 0) {
|
if (TSDB_CODE_SUCCESS == code && winCode != 0) {
|
||||||
qInfo("delete stream part Name for:%"PRId64 " not found", gpIdCol[i]);
|
qDebug("delete stream part Name for:%"PRId64 " not found", gpIdCol[i]);
|
||||||
colDataSetNULL(pTbnameCol, i);
|
colDataSetNULL(pTbnameCol, i);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -758,7 +758,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
||||||
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
|
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
|
||||||
hashValue =
|
hashValue =
|
||||||
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
||||||
stInfo("wjm ctbname for dispatch: %s, pDataBlock.info.parTbName: %s", ctbName, pDataBlock->info.parTbName);
|
|
||||||
SBlockName bln = {0};
|
SBlockName bln = {0};
|
||||||
bln.hashValue = hashValue;
|
bln.hashValue = hashValue;
|
||||||
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
|
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
|
||||||
|
|
|
@ -480,7 +480,6 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char
|
||||||
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
|
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
|
||||||
if (tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
if (tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
||||||
code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
|
code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
|
||||||
qInfo("wjm put group id into parnamemap: %"PRId64 " cur mapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap));
|
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
code = streamStatePutParName_rocksdb(pState, groupId, tbname);
|
code = streamStatePutParName_rocksdb(pState, groupId, tbname);
|
||||||
|
@ -506,7 +505,6 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
|
||||||
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
|
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
|
||||||
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
||||||
code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);
|
code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);
|
||||||
qInfo("wjm put group id into parnamemap: %"PRId64 " cur mapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap));
|
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -528,7 +526,6 @@ _end:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) {
|
int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) {
|
||||||
qTrace("wjm delete par for group:%"PRId64 " parnameMapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap));
|
|
||||||
int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t));
|
int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t));
|
||||||
qTrace("catche %s at line %d res %d", __func__, __LINE__, code);
|
qTrace("catche %s at line %d res %d", __func__, __LINE__, code);
|
||||||
code = streamStateDeleteParName_rocksdb(pState, groupId);
|
code = streamStateDeleteParName_rocksdb(pState, groupId);
|
||||||
|
|
|
@ -1237,10 +1237,11 @@ class TDTestCase:
|
||||||
clust_dnode_nums = len(cluster_dnode_list)
|
clust_dnode_nums = len(cluster_dnode_list)
|
||||||
if clust_dnode_nums > 1:
|
if clust_dnode_nums > 1:
|
||||||
self.test_redistribute_vgroups()
|
self.test_redistribute_vgroups()
|
||||||
|
tdSql.execute("drop tsma test.tsma5")
|
||||||
|
for _ in range(4):
|
||||||
self.test_td_32519()
|
self.test_td_32519()
|
||||||
|
|
||||||
def test_td_32519(self):
|
def test_td_32519(self):
|
||||||
tdSql.execute("drop tsma test.tsma5")
|
|
||||||
self.create_recursive_tsma('tsma1', 'tsma_r', 'test', '1h', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'])
|
self.create_recursive_tsma('tsma1', 'tsma_r', 'test', '1h', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'])
|
||||||
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||||
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||||
|
@ -1269,6 +1270,7 @@ class TDTestCase:
|
||||||
sql = 'select * from test.`163b7c69922cf6d83a98bfa44e52dade`'
|
sql = 'select * from test.`163b7c69922cf6d83a98bfa44e52dade`'
|
||||||
self.wait_query(sql, 2, 20) ## tsma_r output ctb for t1
|
self.wait_query(sql, 2, 20) ## tsma_r output ctb for t1
|
||||||
tdSql.checkData(0, 1, 1)
|
tdSql.checkData(0, 1, 1)
|
||||||
|
self.drop_tsma('tsma_r', 'test')
|
||||||
|
|
||||||
def test_create_tsma(self):
|
def test_create_tsma(self):
|
||||||
function_name = sys._getframe().f_code.co_name
|
function_name = sys._getframe().f_code.co_name
|
||||||
|
|
Loading…
Reference in New Issue