Merge branch 'feature/qnode' of https://github.com/taosdata/TDengine into feature/qnode
This commit is contained in:
commit
1ea3afe7a6
|
@ -89,7 +89,7 @@ extern char *qtypeStr[];
|
||||||
|
|
||||||
#define TSDB_PORT_HTTP 11
|
#define TSDB_PORT_HTTP 11
|
||||||
|
|
||||||
#undef TD_DEBUG_PRINT_ROW
|
#define TD_DEBUG_PRINT_ROW
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -326,6 +326,11 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
|
||||||
|
|
||||||
tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
|
tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
|
||||||
|
|
||||||
|
#ifdef TD_DEBUG_PRINT_ROW
|
||||||
|
printf("!!! %s:%d table %" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, pTbData->uid,
|
||||||
|
SL_SIZE(pTbData->pData));
|
||||||
|
#endif
|
||||||
|
|
||||||
// Set statistics
|
// Set statistics
|
||||||
keyMax = TD_ROW_KEY(blkIter.row);
|
keyMax = TD_ROW_KEY(blkIter.row);
|
||||||
|
|
||||||
|
|
|
@ -497,6 +497,47 @@ _exit:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char* tags) {
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
SMeta *pMeta = pVnode->pMeta;
|
||||||
|
SSubmitBlk *pBlock = NULL;
|
||||||
|
SSubmitBlkIter blkIter = {0};
|
||||||
|
STSRow *row = NULL;
|
||||||
|
STSchema *pSchema = NULL;
|
||||||
|
tb_uid_t suid = 0;
|
||||||
|
|
||||||
|
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||||
|
while (true) {
|
||||||
|
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||||
|
if (pBlock == NULL) break;
|
||||||
|
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||||
|
if (blkIter.row == NULL) continue;
|
||||||
|
if (!pSchema || (suid != msgIter.suid)) {
|
||||||
|
if (pSchema) {
|
||||||
|
taosMemoryFreeClear(pSchema);
|
||||||
|
}
|
||||||
|
pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema
|
||||||
|
if(pSchema) {
|
||||||
|
suid = msgIter.suid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(!pSchema) {
|
||||||
|
printf("%s:%d no valid schema\n", tags, __LINE__);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
char __tags[128] = {0};
|
||||||
|
snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter.uid);
|
||||||
|
while ((row = tGetSubmitBlkNext(&blkIter))) {
|
||||||
|
tdSRowPrint(row, pSchema, __tags);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pSchema);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
@ -508,6 +549,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
||||||
|
|
||||||
pRsp->code = 0;
|
pRsp->code = 0;
|
||||||
|
|
||||||
|
#ifdef TD_DEBUG_PRINT_ROW
|
||||||
|
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
|
||||||
|
#endif
|
||||||
|
|
||||||
// handle the request
|
// handle the request
|
||||||
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
|
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
|
||||||
pRsp->code = TSDB_CODE_INVALID_MSG;
|
pRsp->code = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -551,6 +596,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
||||||
}
|
}
|
||||||
|
|
||||||
rsp.affectedRows += nRows;
|
rsp.affectedRows += nRows;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
Loading…
Reference in New Issue