fix: tsma result submit for row/column update
This commit is contained in:
parent
c684a7a334
commit
8905a4839e
|
@ -1879,7 +1879,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
|
||||||
char pBuf[128] = {0};
|
char pBuf[128] = {0};
|
||||||
int32_t sz = taosArrayGetSize(dataBlocks);
|
int32_t sz = taosArrayGetSize(dataBlocks);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SSDataBlock* pDataBlock = taosArrayGetP(dataBlocks, i);
|
SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
|
||||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
|
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
@ -1891,21 +1891,37 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
|
||||||
for (int32_t k = 0; k < numOfCols; k++) {
|
for (int32_t k = 0; k < numOfCols; k++) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||||
|
if (k == 0) {
|
||||||
|
printf("cols:%d |", (int32_t)numOfCols);
|
||||||
|
}
|
||||||
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
|
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
|
||||||
printf(" %15s |", "NULL");
|
printf(" %15s |", "NULL");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (pColInfoData->info.type) {
|
switch (pColInfoData->info.type) {
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
|
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
|
||||||
printf(" %25s |", pBuf);
|
printf(" %25s |", pBuf);
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
printf(" %15d |", *(int32_t*)var);
|
printf(" %15" PRIi8 " |", *(int8_t*)var);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
printf(" %15" PRIi8 " |", *(int8_t*)var);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
printf(" %15" PRIi8 " |", *(int16_t*)var);
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
printf(" %15d |", *(int32_t*)var);
|
printf(" %15d |", *(int32_t*)var);
|
||||||
break;
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
printf(" %15" PRIi8 " |", *(int8_t*)var);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
printf(" %15" PRIi8 " |", *(int16_t*)var);
|
||||||
|
break;
|
||||||
case TSDB_DATA_TYPE_UINT:
|
case TSDB_DATA_TYPE_UINT:
|
||||||
printf(" %15u |", *(uint32_t*)var);
|
printf(" %15u |", *(uint32_t*)var);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -241,7 +241,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
||||||
SRpcMsg submitReqMsg = {
|
SRpcMsg submitReqMsg = {
|
||||||
.msgType = TDMT_VND_SUBMIT,
|
.msgType = TDMT_VND_SUBMIT,
|
||||||
.pCont = pSubmitReq,
|
.pCont = pSubmitReq,
|
||||||
.contLen = ntohl(contLen),
|
.contLen = contLen,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) {
|
if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) {
|
||||||
|
|
|
@ -426,7 +426,7 @@ int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* p
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
for (int32_t j = 0; j < rows; ++j) {
|
||||||
taosArrayClear(pVals);
|
taosArrayClear(pVals);
|
||||||
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||||
const STColumn* pCol = &pTSchema->columns[k];
|
const STColumn* pCol = &pTSchema->columns[k];
|
||||||
|
@ -469,7 +469,7 @@ int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* p
|
||||||
if (NULL == pBuf) {
|
if (NULL == pBuf) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode));
|
((SMsgHead*)pBuf)->vgId = TD_VID(pVnode);
|
||||||
((SMsgHead*)pBuf)->contLen = htonl(len);
|
((SMsgHead*)pBuf)->contLen = htonl(len);
|
||||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
|
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
|
||||||
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
|
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
|
||||||
|
|
Loading…
Reference in New Issue