diff --git a/.gitignore b/.gitignore index 88faa0e2b8..d3a9ddb12a 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,6 @@ charmrun ampirun pgm *.swp + +#Ignore the generated headers dir +src/libs/ck-libs/io/headers# diff --git a/doc/libraries/manual.rst b/doc/libraries/manual.rst index 7908b9c0bc..507ed9a17e 100644 --- a/doc/libraries/manual.rst +++ b/doc/libraries/manual.rst @@ -974,7 +974,7 @@ The following functions comprise the interface to the library for parallel file This method is invoked to read data asynchronously from the read session. This method returns immediately to the caller, but the read is only guaranteed complete once the callback ``after_read`` is called. Internally, the read request is buffered until the Buffer Chares can respond with the requested data. After the read finishes, the - after_read callback is invoked taking a ReadCompleteMsg* which points to a vector buffer, the offset, + after_read callback is invoked taking a ReadCompleteMsg* which points to a char* buffer, the offset, and the number of bytes of the read. @@ -989,10 +989,74 @@ The following functions comprise the interface to the library for parallel file the ``FileReadyMsg`` sent to the ``opened`` callback after a file has been opened. This method should only be called from a single PE, once per file. +FileReader API +-------------- + +The FileReader API is an additional abstraction layer built on top of Ck::IO to support +streaming reads from a file and implement the callback internally. This API is designed to +match that of the c++ std::ifstream. Under the hood, when an application reads a small +number of bytes via a FileReader object, +the Buffer Chare will send a large chunk of data to the FileReader which can be buffered there +until the application requests more. + +- Creating a FileReader object: + + .. code-block:: c++ + + FileReader::FileReader(Ck::IO::Session session) + + Before creating a FileReader, the Ck::IO::Session must be created (see above). This session is + passed in to the FileReader constructor. + +- Reading data: + + .. code-block:: c++ + + FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read) + + Read the specified number of bytes from the file opened in the session. The data is read into the buffer. + This method is blocking and returns a pointer to the FileReader object. + +- Seeking: + + There are two functions for seeking in the file, one for seeking from the current position, and one for seeking from a set position (like the end, or the beginning). + + .. code-block:: c++ + + FileReader& FileReader::seekg(size_t pos) + + .. code-block:: c++ + + FileReader& FileReader::seekg(size_t pos, std::ios_base::seekdir dir) + + The options for std::ios_base::seekdir are std::ios_base::beg, std::ios_base::cur, and std::ios_base::end. + +- Tell functionality: + + .. code-block:: c++ + + size_t FileReader::tellg() + + This function returns the current position in the file. + +- End of file and gcount: + + .. code-block:: c++ + + bool FileReader::eof() + + This function returns true if the end of the file has been reached and false otherwise. + + .. code-block:: c++ + + size_t FileReader::gcount() + + This function returns the number of bytes read by the last read operation. + Examples -------- For example code showing how to use CkIO for output, see ``tests/charm++/io/``. -For example code showing how to use CkIO for input, see ``tests/charm++/io_read/``. +For example code showing how to use CkIO for input (including FileReader), see ``tests/charm++/io_read/``. diff --git a/src/libs/ck-libs/io/ckio.C b/src/libs/ck-libs/io/ckio.C index e967a2f9ae..b7f38b26cc 100644 --- a/src/libs/ck-libs/io/ckio.C +++ b/src/libs/ck-libs/io/ckio.C @@ -280,6 +280,8 @@ private: public: ReadAssembler(Session session) { _session = session; } + ReadAssembler(CkMigrateMessage* m) : CBase_ReadAssembler(m) {} + /* * This function adds the read request to the _read_info_buffer table * which maps a tag to a ReadInfo struct @@ -308,6 +310,11 @@ public: return _curr_read_tag - 1; } + void pup(PUP::er& p) + { + // TODO: All files must be closed across checkpoint/restart + } + void removeEntryFromReadTable(int tag) { _read_info_buffer.erase(tag); } /** @@ -989,6 +996,157 @@ public: int registerArray(CkArrayIndex& numElements, CkArrayID aid) { return 0; } }; +FileReader::FileReader(Ck::IO::Session session) + : _session_token(session), _offset(session.offset), _num_bytes(session.bytes) +{ +} + +FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read) +{ + if (_eofbit) + { // no more bytes to read + _gcount = 0; + return *this; + } + size_t amt_from_cache = _data_cache.getFromBuffer( + _curr_pos, num_bytes_to_read, buffer); // get whatever data the cache has for us + _curr_pos += amt_from_cache; + if (amt_from_cache == num_bytes_to_read) + { + return *this; + } + size_t bytes_to_read_left = num_bytes_to_read - amt_from_cache; + size_t bytes_to_read = std::min( + std::max(bytes_to_read_left, _data_cache.capacity()), + (_offset + _num_bytes - + _curr_pos)); // if the read is too small, get more data to store in the buffer + if (!bytes_to_read) + { + return *this; + } + char* tmp_data_buff = + new char[bytes_to_read]; // temporary buffer that will hold all the data + ReadCompleteMsg* read_msg; + Ck::IO::read(_session_token, bytes_to_read, _curr_pos, tmp_data_buff, + CkCallbackResumeThread((void*&)read_msg)); + // below will not get executed until the read is done + size_t bytes_read = read_msg->bytes; + if (bytes_read > bytes_to_read_left) + { + // if I read more bytes than what was left to read, that means I have extra bytes that + // the buffer can use + _data_cache.setBuffer(_curr_pos, bytes_read, tmp_data_buff); + _curr_pos += bytes_to_read_left; + std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_to_read_left); + } + else + { + // too many bytes, nothing to actually cache + _curr_pos += bytes_read; + std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_read); + } + delete[] tmp_data_buff; + if (_curr_pos >= (_offset + _num_bytes)) + { + _eofbit = true; // ran out of data to read + _curr_pos = _offset + _num_bytes; + } + _gcount = std::min(bytes_read, bytes_to_read_left); + return *this; +} + +// overload ! operator on filereader object +bool FileReader::operator!() const +{ + // CkPrintf("In overwritten operator\n"); + return false; +} + +size_t FileReader::tellg() { return _curr_pos; } + +FileReader& FileReader::seekg(size_t pos, std::ios_base::seekdir dir) +{ + if (dir == std::ios_base::beg) + { + _curr_pos = pos + _offset; + } + else if (dir == std::ios_base::cur) + { + _curr_pos += pos; + } + else if (dir == std::ios_base::end) + { + _curr_pos = _offset + _num_bytes - pos; + } + + _eofbit = false; + if (_curr_pos < _offset) + { + _curr_pos = _offset; + _eofbit = false; + } + else if (_curr_pos >= (_offset + _num_bytes)) + { + _curr_pos = _offset + _num_bytes; + _eofbit = true; + } + return *this; +} + +FileReader& FileReader::seekg(size_t pos) +{ + _curr_pos = pos; + if (_curr_pos < _offset) + { + _curr_pos = _offset; + _eofbit = false; + } + else if (_curr_pos >= (_offset + _num_bytes)) + { + _curr_pos = _offset + _num_bytes; + _eofbit = true; + return *this; + } + _eofbit = false; + return *this; +} + +bool FileReader::eof() { return _eofbit; } + +size_t FileReader::gcount() { return _gcount; } + +FileReaderBuffer::FileReaderBuffer() { _buffer = new char[_buff_capacity]; } + +FileReaderBuffer::FileReaderBuffer(size_t buff_capacity) +{ + _buff_capacity = buff_capacity; + _buffer = new char[_buff_capacity]; +} + +void FileReaderBuffer::setBuffer(size_t offset, size_t num_bytes, char* data) +{ + is_dirty = false; + _offset = offset; + _buff_size = std::min(_buff_capacity, num_bytes); + std::memcpy(_buffer, data, _buff_size); // copy the first section of bytes +} + +size_t FileReaderBuffer::getFromBuffer(size_t offset, size_t num_bytes, char* buffer) +{ + if (is_dirty || offset < _offset || offset >= (_offset + _buff_size)) + { + return 0; // the buffer has nothing of relevance + } + size_t cached_len = std::min(offset + num_bytes, _offset + _buff_size) - offset; + std::memcpy(buffer, _buffer + (offset - _offset), cached_len); + return cached_len; +} + +size_t FileReaderBuffer::capacity() { + return _buff_capacity; +} + +FileReaderBuffer::~FileReaderBuffer() { delete[] _buffer; } } // namespace IO } // namespace Ck diff --git a/src/libs/ck-libs/io/ckio.ci b/src/libs/ck-libs/io/ckio.ci index 3df960e375..1218c9a8c4 100644 --- a/src/libs/ck-libs/io/ckio.ci +++ b/src/libs/ck-libs/io/ckio.ci @@ -128,7 +128,7 @@ entry void addSessionReadAssemblerFinished(CkReductionMsg* msg); } // class tht will be used to assemble a specific read call -group ReadAssembler +group[migratable] ReadAssembler { // stores the parameters of the read call it is tasked with building entry ReadAssembler(Session session); diff --git a/src/libs/ck-libs/io/ckio.h b/src/libs/ck-libs/io/ckio.h index f3281427be..f301f7a4b0 100644 --- a/src/libs/ck-libs/io/ckio.h +++ b/src/libs/ck-libs/io/ckio.h @@ -2,6 +2,7 @@ #define CK_IO_H #include +#include #include #include #include @@ -26,7 +27,13 @@ namespace IO struct Options { Options() - : peStripe(0), writeStripe(0), activePEs(-1), basePE(-1), skipPEs(-1), numReaders(0) + : peStripe(0), + writeStripe(0), + activePEs(-1), + basePE(-1), + skipPEs(-1), + read_stride(0), + numReaders(0) { } @@ -40,6 +47,8 @@ struct Options int basePE; /// How should active PEs be spaced out? int skipPEs; + // How many bytes each Read Session should hold + size_t read_stride; // How many IO buffers should there be size_t numReaders; @@ -50,10 +59,13 @@ struct Options p | activePEs; p | basePE; p | skipPEs; + p | read_stride; p | numReaders; } }; +class FileReader; + class File; // class ReadAssembler; /// Open the named file on the selected subset of PEs, and send a @@ -91,10 +103,10 @@ void close(File file, CkCallback closed); /** * Prepare to read data from @arg file section specified by @arg bytes and @arg offset. - * On starting the session, the buffer chares begin eagerly reading all requested data - * into memory. The ready callback is invoked once all buffer chares have been created and - * their reads have been initiated (but the reads are not guaranteed to be complete at - * this point). + * This method will proceed to eagerly read all of the data in that window into memory + * for future read calls. After all the data is read in, the ready callback will be + * invoked. The ready callback will take in a SessionReadyMessage* that will contain the + * offset, the amount of bytes , and the buffer in the form of a vector. */ void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready); @@ -116,11 +128,17 @@ void closeReadSession(Session read_session, CkCallback after_end); /** * Is a method that reads data from the @arg session of length @arg bytes at offset * @arg offset (in file). After this read finishes, the @arg after_read callback is - * invoked, taking a ReadCompleteMsg* which points to a vector buffer, the offset, + * invoked, taking a ReadCompleteMsg* which points to a char* buffer, the offset, * and the number of bytes of the read. * */ void read(Session session, size_t bytes, size_t offset, char* data, CkCallback after_read); +void read(Session session, size_t bytes, size_t offset, CkCallback after_read, + size_t tag); + +// ZERO COPY READ; +void read(Session session, size_t bytes, size_t offset, CkCallback after_read, size_t tag, + char* user_buffer); class File { @@ -170,6 +188,7 @@ class Session friend void read(Session session, size_t bytes, size_t offset, char* data, CkCallback after_read); friend struct std::hash; + friend class FileReader; public: Session(int file_, size_t bytes_, size_t offset_, CkArrayID sessionID_) @@ -216,6 +235,147 @@ class ReadCompleteMsg : public CMessage_ReadCompleteMsg { } }; +/** + * This is used by the FileReader in order to try to minimize + * the number of networks calls made during a read. Instead + * of calling Ck::IO::read repeatedly and each call has only a + * small amount of data, the FileReader will make a Ck::IO::read + * call with a larger amount of data and store that data in the + * FileReaderBuffer. This way, if the FileReader is making small + * read calls, the data will hopefully already be in the buffer, + * which prevents superfluous messages. This is NOT a user facing + * class and should not be used by the user. + */ +class FileReaderBuffer +{ + size_t _buff_capacity = 4096; // the size of the buffer array + size_t _buff_size = 0; // the number of valid elements in the array + size_t _offset = 0; // the offset byte + char* _buffer; + bool is_dirty = true; + +public: + FileReaderBuffer(); + FileReaderBuffer(size_t buff_capacity); + ~FileReaderBuffer(); + /** + * Copies the @arg data into the head of _buffer + * until the _buffer is full or data has been fully copied. + * Will also set _buff_size to the number of bytes that was + * copied. + * + * @arg offset: the offset in the file the @arg data arguments are from. + * @arg num_bytes: the length of @arg data + * @arg data: the array with the data to be put into the FileReaderBuffer + */ + void setBuffer(size_t offset, size_t num_bytes, + char* data); // writes the data to the buffer + /** + * This data checks whether, given a request specified by @arg offset + * and @arg num_bytes, can use some of its cached data to fulfill the + * request. This method changes the @arg buffer. + * + * @arg offset: the offset in the session the read request is. + * @arg num_bytes: the number of bytes of the read request. + * @arg buffer: the address of the buffer the read will go; + * this method will write to that address. + */ + size_t getFromBuffer(size_t offset, size_t num_bytes, char* buffer); + /** + * Returns the capacity of the internal buffer. + * @return size_t: the total capacity of _buffer. + */ + size_t capacity(); +}; +/** + * The Ck:IO equivalent to std::ifstream. If the user + * doesn't want to write callbacks after a lot of reads, + * or the user is making a series of very small sequential + * reads, this abstraction will make it very easy. FileReader + * uses caching in order to try and minimize the number of + * extraneous network calls made during a series of read requests. + * This class should be used in threaded entry methods. + */ +class FileReader +{ + Session _session_token; + size_t _curr_pos = 0; + size_t _offset, _num_bytes; + bool _eofbit = false; + size_t _gcount = 0; + FileReaderBuffer _data_cache; + bool _status = true; + +public: + std::ios_base::seekdir end = std::ios_base::end; + std::ios_base::seekdir cur = std::ios_base::cur; + std::ios_base::seekdir beg = std::ios_base::beg; + /** + * @arg Session: the session token the FileReader will use + */ + FileReader(Ck::IO::Session session); + /** + * Perform a request of size @arg num_bytes_to_read, with + * an offset of wherever the FileReader is in the stream. + * It will write the result to @arg buffer. + * + * @arg buffer: the location where the read will be written to + * @arg num_bytes_to_read: the number of bytes to read + */ + FileReader& read(char* buffer, size_t num_bytes_to_read); + /** + * Returns the current position in the file the FileReader + * is i.e te next byte the read will start. + * + * @return size_t: the position the FileReader is at in the + * file + */ + size_t tellg(); + /** + * Seeks to a position in the file for the FileReader from the + * beginning of the read session. If the seek goes beyond the end of + * the read session, it will set the internal position to be one byte + * further than the end of session and the eof flag will be set. + * + * @arg pos: the position in the session wrt the beginning of + * the session to seek to. + */ + FileReader& seekg(size_t pos); + /** + * Seeks to a position in the file for the FileReader wrt the + * @arg dir specifies. If the seek goes beyond the end of the + * read session, it will set the internal position to be one byte + * further than the end of session and the eof flag will be set. + * + * @arg pos: the position in the session wrt what @arg dir + * the session to seek to. + * @arg dir: Where to seek with respect to. If dir=std::ios_base::beg, + * then it is with respect to the beginning of the file. If + * dir=std::ios_base::cur, it is with respect to the current + * position of the FileReader. If dir=std::ios_base_end, then + * it is with respect to the end of the stream. + */ + FileReader& seekg(size_t pos, std::ios_base::seekdir dir); + /** + * Returns whether the FileReader is at the end of the session. + * + * @return bool: whether the FileReader is at end of session. + */ + bool eof(); + /** + * Returns the number of bytes the last read did. + * @return size_t: the number of bytes the last read call did. + */ + size_t gcount(); + /** + * Will return true if the FileReader is on a bad file. + * Currently this always returns false because we assume + * that the Session points to a good file. + * + * @return bool: false + */ + bool operator!() const; +}; } // namespace IO } // namespace Ck diff --git a/tests/charm++/io_read/iotest.C b/tests/charm++/io_read/iotest.C index 4f02616ced..7aa60adf7f 100644 --- a/tests/charm++/io_read/iotest.C +++ b/tests/charm++/io_read/iotest.C @@ -47,6 +47,7 @@ public: class Test : public CBase_Test { char* dataBuffer; + char* file_reader_buffer; int size; std::string _fname; @@ -55,10 +56,16 @@ public: { CkPrintf("Inside the constructor of tester %d\n", thisIndex); _fname = filename; + thisProxy[thisIndex].testMethod(token, bytesToRead); + } + + void testMethod(Ck::IO::Session token, size_t bytesToRead) + { CkCallback sessionEnd(CkIndex_Test::readDone(0), thisProxy[thisIndex]); try { dataBuffer = new char[bytesToRead]; + file_reader_buffer = new char[bytesToRead]; } catch (const std::bad_alloc& e) { @@ -66,10 +73,44 @@ public: bytesToRead, thisIndex); CkExit(); } + + // setup and read using Ck::IO::FileReader size = bytesToRead; + Ck::IO::FileReader fr(token); + fr.seekg(bytesToRead * thisIndex); // seek to the correct place in the file + fr.read(file_reader_buffer, + size); // hopefully this will return the same data as Ck::IO::read + CkAssert(fr.gcount() == size); // makes sure that the gcount is correct + CkAssert(fr.tellg() == + (size + bytesToRead * thisIndex)); // make sure that the tellg points to the + // correct place in the stream + CkPrintf( + "the FileReader::read function on tester[%d] is done with first character=%c\n", + thisIndex, file_reader_buffer[0]); + + testFileReader(fr); + + // read using plain Ck::IO::Read Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); } + void testFileReader(Ck::IO::FileReader& fr) + { + size_t og_pos = fr.tellg(); + fr.seekg( + 100000000000000); // way beyond the bounds of read session, should trigger eof + CkAssert(fr.eof()); + fr.seekg(5); + CkAssert(fr.eof() == false); + fr.seekg(1, std::ios_base::cur); + CkAssert(fr.tellg() == 6); // test that the seekg with different offset worked + fr.seekg(0, std::ios_base::end); + CkAssert(fr.eof()); // seeked to the end of file, make sure that the flag is on + fr.seekg(6, std::ios_base::beg); + CkAssert(fr.tellg() == 6); + fr.seekg(og_pos); + } + Test(CkMigrateMessage* m) {} void readDone(Ck::IO::ReadCompleteMsg* m) @@ -96,14 +137,16 @@ public: if (verify_buffer[i] != dataBuffer[i]) { CkPrintf( - "From reader %d, offset=%d, bytes=%d, verify_buuffer[%d]=%c, " + "From reader %d, offset=%zu, bytes=%zu, verify_buuffer[%d]=%c, " "dataBuffer[%d]=%c\n", thisIndex, (m->offset), (m->bytes), i, verify_buffer[i], i, dataBuffer[i]); } assert(verify_buffer[i] == dataBuffer[i]); + assert(verify_buffer[i] == file_reader_buffer[i]); } delete[] verify_buffer; delete[] dataBuffer; + delete[] file_reader_buffer; CkPrintf("Index %d is now done with the reads...\n", thisIndex); contribute(done); } diff --git a/tests/charm++/io_read/iotest.ci b/tests/charm++/io_read/iotest.ci index 7ed66f071d..c7ed55ded0 100644 --- a/tests/charm++/io_read/iotest.ci +++ b/tests/charm++/io_read/iotest.ci @@ -79,6 +79,7 @@ entry void iterDone(); array[1D] Test { entry Test(Ck::IO::Session token, size_t bytesToRead, std::string filename); + entry [threaded] void testMethod(Ck::IO::Session token, size_t bytesToRead); entry void readDone(Ck::IO::ReadCompleteMsg * m); } }