Definition at line 729 of file IOBuffer.h. References Available(), Impala::Util::Channel::Buffer(), Impala::Util::Channel::DATA_BUFFER_SIZE, GetPosition(), ILOG_DEBUG, ILOG_ERROR, Impala::Util::Channel::LastSendHadError(), mDataChannel, mDataChannelFile, Read(), Impala::Util::Channel::SendRequest(), SetPosition(), and SetSize(). Referenced by CheckDataChannelWrite(), and ~IOBuffer(). 00730 { 00731 SetSize(GetPosition()); // assume the last write designates the end 00732 char* buf = mDataChannel->Buffer(); 00733 sprintf(buf, "openfilebuffer \"%s\" 0\0", mDataChannelFile.c_str()); 00734 mDataChannel->SendRequest(strlen(buf)+1); 00735 if (mDataChannel->LastSendHadError()) 00736 { 00737 ILOG_ERROR("WriteIOBufferToChannel: open failed for [" << 00738 mDataChannelFile << "]"); 00739 return false; 00740 } 00741 int id; 00742 Int64 bufSize; 00743 sscanf(buf, "%d %lld", &id, &bufSize); // bufSize is dummy when writing 00744 ILOG_DEBUG(mDataChannelFile << ", id = " << id); 00745 00746 Int64 nrWritten = 0; 00747 SetPosition(0); 00748 while (Available() > 0) 00749 { 00750 sprintf(buf, "writefilebuffer %d\0", id); 00751 int used = strlen(buf) + 1; 00752 int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE-used, Available()); 00753 Read(buf+used, nrData); 00754 int len = mDataChannel->SendRequest(used+nrData); 00755 if (mDataChannel->LastSendHadError()) 00756 { 00757 ILOG_ERROR("WriteIOBufferToChannel: write failed for [" << 00758 mDataChannelFile << "]"); 00759 break; 00760 } 00761 nrWritten += nrData; 00762 ILOG_DEBUG(" wrote " << nrData << " bytes, total = " << nrWritten); 00763 } 00764 sprintf(buf, "closefilebuffer %d\0", id); 00765 mDataChannel->SendRequest(strlen(buf)+1); 00766 return true; 00767 }
Here is the call graph for this function: ![]()
|