Beefy Boxes and Bandwidth Generously Provided by pair Networks
Your skill will accomplish
what the force of many cannot
 
PerlMonks  

Re^4: Solving the Long List is Long challenge - Tkrzw llil4tkh2

by marioroy (Prior)
on Jul 17, 2023 at 05:33 UTC ( [id://11153433]=note: print w/replies, xml ) Need Help??


in reply to Re^3: Solving the Long List is Long challenge - Learning Tkrzw
in thread Solving the Long List is Long challenge, finally?

I created another Tkrzw demonstration. This one constructs many HashDBMs. Basically, sharding is managed by the application.

Update 1: The HashDBMs are now interchangeable/compatible with ShardDBMs, since using the same hash function.
Update 2: Changed bswap_64, now using the library tkrzw::StrToIntBigEndian function.

#include <tkrzw_dbm_common_impl.h> idx = tkrzw::SecondaryHash(key, nmaps); dbm_ret[idx].IncrementSimple(key, count);
$ NUM_THREADS=24 NUM_MAPS=96 ./llil4tkh2 big{1,2,3}.txt | cksum llil4tkh2 (fixed string length=12) start sharding managed by the application use OpenMP use boost sort get properties 0.446 secs 23.645 mil QPS hashDBMs to vector 0.354 secs vector stable sort 0.081 secs write stdout 0.210 secs total time 1.092 secs count lines 10545600 count unique 10367603 2956888413 93308427 # Results for 26 big files: $ NUM_THREADS=24 NUM_MAPS=96 ./llil4tkh2 in/biga* | cksum llil4tkh2 (fixed string length=12) start sharding managed by the application use OpenMP use boost sort get properties 3.507 secs 26.051 mil QPS hashDBMs to vector 1.777 secs vector stable sort 0.665 secs write stdout 1.532 secs total time 7.483 secs count lines 91395200 count unique 79120065 2005669956 712080585 $ NUM_THREADS=48 NUM_MAPS=128 ./llil4tkh2 in/biga* | cksum llil4tkh2 (fixed string length=12) start sharding managed by the application use OpenMP use boost sort get properties 2.335 secs 39.141 mil QPS hashDBMs to vector 1.410 secs vector stable sort 0.677 secs write stdout 1.555 secs total time 5.979 secs count lines 91395200 count unique 79120065 2005669956 712080585 # One billion+ lines (312 big files) $ NUM_THREADS=48 NUM_MAPS=128 ./llil4tkh2 \ in/biga* in/biga* in/biga* in/biga* in/biga* in/biga* \ in/biga* in/biga* in/biga* in/biga* in/biga* in/biga* \ | cksum llil4tkh2 (fixed string length=12) start sharding managed by the application use OpenMP use boost sort get properties 24.295 secs 45.143 mil QPS hashDBMs to vector 1.410 secs vector stable sort 0.644 secs write stdout 1.439 secs total time 27.790 secs count lines 1096742400 count unique 79120065 3625599930 791200650

llil4tkh2.cc

// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~ // llil4tkh2.cc // A tkrzw::HashDBM demonstration. // https://www.perlmonks.com/?node_id=11149643 // // April 25, 2024 // Based on llil3m.cpp https://perlmonks.com/?node_id=11149482 // Original challenge https://perlmonks.com/?node_id=11147822 // and summary https://perlmonks.com/?node_id=11150293 // Other demonstrations https://perlmonks.com/?node_id=11149907 // // Authors // Mario Roy - C++ demonstration with parallel capabilities // eyepopslikeamosquito - Co-author, learning C++ at PerlMonks.com // // See also, memory efficient variant // https://gist.github.com/marioroy/d02881b96b20fa1adde4388b3e21616 +3 // // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~ // OpenMP Little Book - https://nanxiao.gitbooks.io/openmp-little-book // // On macOS, use g++-12 from https://brew.sh (installation: brew insta +ll gcc@12). // // This demonstration requires the Tkrzw C++ library. // - homepage https://dbmx.net/tkrzw/ // - C++ source https://dbmx.net/tkrzw/pkg/ // - documentation https://dbmx.net/tkrzw/api/ // - GitHub page https://github.com/estraier/tkrzw // - installation: // on Linux: ./configure // on macOS: CC=gcc-12 CXX=g++-12 ./configure // make // make install # Note: you may need to use "sudo" // // The Tkrzw bundled command "tkrzw_build_util" is useful to know the +include and LDFLAGS. // tkrzw_build_util config -i // tkrzw_build_util config -l // // Compile on Linux (clang++ or g++): // clang++ -o llil4tkh2 -std=c++20 -fopenmp -Wall -O3 llil4tkh2.cc +-I/usr/local/include -L/usr/local/lib -ltkrzw -lstdc++ -lrt -latomic +-lpthread -lm -lc // // Compile on macOS (g++-12): // g++-12 -o llil4tkh2 -std=c++20 -fopenmp -Wall -O3 llil4tkh2.cc - +I/usr/local/include -L/usr/local/lib -ltkrzw -lstdc++ -latomic -lpthr +ead -lm -lc // // Obtain gen-llil.pl and gen-long-llil.pl from https://perlmonks.com/ +?node_id=11148681 // perl gen-llil.pl big1.txt 200 3 1 // perl gen-llil.pl big2.txt 200 3 1 // perl gen-llil.pl big3.txt 200 3 1 // // To make random input, obtain shuffle.pl from https://perlmonks.com/ +?node_id=11149800 // perl shuffle.pl big1.txt >tmp && mv tmp big1.txt // perl shuffle.pl big2.txt >tmp && mv tmp big2.txt // perl shuffle.pl big3.txt >tmp && mv tmp big3.txt // // Example run: llil4tkh2 big1.txt big2.txt big3.txt >out.txt // TMPDIR=/dev/shm NUM_THREADS=8 NUM_MAPS=32 llil4tkh2 ... // TMPDIR=/dev/shm NUM_THREADS=max NUM_MAPS=max llil4tkh2 ... // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~ #include <cassert> #include <cstdio> #include <cstddef> #include <cstdint> #include <cstdlib> #include <cstring> #include <ctime> #include <compare> #include <chrono> #include <string> #include <string_view> #include <array> #include <vector> #include <thread> #include <execution> #include <atomic> #include <iomanip> #include <iostream> #include <fstream> #include <unistd.h> #include <filesystem> namespace fs = std::filesystem; #include <tkrzw_dbm_common_impl.h> #include <tkrzw_dbm_hash.h> #include <tkrzw_str_util.h> static_assert(sizeof(size_t) == sizeof(int64_t), "size_t too small, ne +ed a 64-bit compile"); // Specify 0/1 to use boost's parallel sorting algorithm; faster than +__gnu_parallel::sort. // https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/html/sort/paral +lel.html // https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/papers/block_in +direct_sort_en.pdf // This requires the boost header files: e.g. devpkg-boost bundle on C +lear Linux. // Note: Another option is downloading and unpacking Boost locally. // (no need to build it because the bits we use are header file only) #define USE_BOOST_PARALLEL_SORT 1 #if USE_BOOST_PARALLEL_SORT #ifdef __clang__ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wunused-parameter" #pragma clang diagnostic ignored "-Wshadow" #include <boost/sort/sort.hpp> #pragma clang diagnostic pop #else #include <boost/sort/sort.hpp> #endif #endif #ifdef _OPENMP #include <omp.h> #endif // ------------------------------------------------------------------- +--------- typedef int64_t int_type; // All words in big1.txt, big2.txt, big3.txt are <= 6 chars in length. // big.txt max word length is 6 // long.txt max word length is 208 // // Based on rough benchmarking, the short fixed string hack below is o +nly // worth trying for MAX_STR_LEN_L up to about 30. // See also https://backlinko.com/google-keyword-study // // To use (limited length) fixed length strings uncomment the next lin +e. #define MAX_STR_LEN_L (size_t) 12 #ifdef MAX_STR_LEN_L struct str_type : std::array<char, MAX_STR_LEN_L> { bool operator==( const str_type& o ) const { return ::memcmp(this->data(), o.data(), MAX_STR_LEN_L) == 0; } bool operator<( const str_type& o ) const { return ::memcmp(this->data(), o.data(), MAX_STR_LEN_L) < 0; } }; #else using str_type = std::basic_string<char>; #endif using str_int_type = std::pair<str_type, int_type>; using vec_str_int_type = std::vector<str_int_type>; // Mimic the Perl get_properties subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~ // convert positive number from string to uint32_t inline uint32_t fast_atoll64(const char* str) { uint32_t val = 0; uint8_t digit; while ((digit = uint8_t(*str++ - '0')) <= 9) val = val * 10 + digit; return val; } // Helper function to find a character. inline char* find_char(char* first, char* last, char c) { while (first != last) { if (*first == c) break; ++first; } return first; } // Limit line length and chunk size. inline constexpr size_t MAX_LINE_LEN = 255; inline constexpr size_t CHUNK_SIZE = 32768; inline constexpr int MAX_NUM_MAPS = 255; static int64_t get_properties( const char* fname, // in : the input file name const int nthds, // in : the number of threads const int nmaps, // in : the number of maps (or shar +ds) auto& dbm_ret) // inout: the dbm to be updated { int64_t num_lines = 0; std::ifstream fin(fname, std::ifstream::binary); if (!fin.is_open()) { std::cerr << "Error opening '" << fname << "' : " << strerror(er +rno) << '\n'; return num_lines; } #pragma omp parallel reduction(+:num_lines) { std::string buf; buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0'); while (fin.good()) { size_t len = 0; // Read the next chunk serially. #pragma omp critical { fin.read(&buf[0], CHUNK_SIZE); if ((len = fin.gcount()) > 0) { if (buf[len - 1] != '\n' && fin.getline(&buf[len], MAX_ +LINE_LEN)) { // Getline discards the newline char and appends nul +l char. // Therefore, change '\0' to '\n'. len += fin.gcount(); buf[len - 1] = '\n'; } } } if (!len) break; buf[len] = '\0'; char *first = &buf[0]; char *last = &buf[len]; // Process max Nthreads chunks concurrently. while (first < last) { char* beg_ptr{first}; char* end_ptr{find_char(first, last, '\n')}; char* found = find_char(beg_ptr, end_ptr, '\t'); first = end_ptr + 1; if (found == end_ptr) continue; assert(*found == '\t'); int_type count = fast_atoll64(found + 1); #ifdef MAX_STR_LEN_L size_t klen = std::min(MAX_STR_LEN_L, (size_t)(found - beg +_ptr)); #else size_t klen = found - beg_ptr; #endif std::basic_string_view<char> key { reinterpret_cast<const char*>(beg_ptr), klen }; size_t idx = tkrzw::SecondaryHash(key, nmaps); dbm_ret[idx].IncrementSimple(key, count); ++num_lines; } } } fin.close(); // std::cerr << "getprops done\n"; return num_lines; } // Output subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~ size_t divide_up(size_t dividend, size_t divisor) { if (dividend % divisor) return (size_t)(dividend / divisor) + 1; else return (size_t)(dividend / divisor); } static void out_properties( const int nthds, // in : the number of threads vec_str_int_type& vec) // in : the vector to output { size_t num_chunks = divide_up(vec.size(), CHUNK_SIZE); int nthds_out = 1; #ifdef _OPENMP nthds_out = std::min(nthds, 32); #endif #pragma omp parallel for ordered schedule(static, 1) num_threads(nt +hds_out) for (size_t chunk_id = 1; chunk_id <= num_chunks; ++chunk_id) { std::string str(""); str.reserve(2048 * 1024); auto it = vec.begin() + (chunk_id - 1) * CHUNK_SIZE; auto it2 = vec.begin() + std::min(vec.size(), chunk_id * CHUNK_S +IZE); for (; it != it2; ++it) { #ifdef MAX_STR_LEN_L str.append(it->first.data()); #else str.append(it->first.data(), it->first.size()); #endif str.append("\t", 1); str.append(std::to_string(it->second)); str.append("\n", 1); } #pragma omp ordered std::cout << str << std::flush; } } typedef std::chrono::high_resolution_clock high_resolution_clock; typedef std::chrono::high_resolution_clock::time_point time_point; typedef std::chrono::milliseconds milliseconds; double elaspe_time(time_point cend, time_point cstart) { return double ( std::chrono::duration_cast<milliseconds>(cend - cstart).count() ) * 1e-3; } // ------------------------------------------------------------------- +-- int main(int argc, char* argv[]) { if (argc < 2) { if (argc > 0) std::cerr << "usage: llil4tkh2 file1 file2 ... >out.txt\n"; return 1; } // Determine the temp dir to use. fs::path tmpdir = fs::temp_directory_path(); const char* env_tmpdir = std::getenv("TMPDIR"); if (env_tmpdir && strlen(env_tmpdir)) tmpdir.assign(env_tmpdir); else if (fs::is_directory("/dev/shm") && !access("/dev/shm", W_OK)) tmpdir.assign("/dev/shm"); #ifdef _OPENMP // Determine the number of maps (or shards). const char* env_nmaps = std::getenv("NUM_MAPS"); int nmaps = (env_nmaps && strlen(env_nmaps)) ? (::strcmp(env_nmaps, "max") == 0) ? MAX_NUM_MAPS : ::atoi(env_ +nmaps) : 32; if (nmaps <= 0) nmaps = 32; if (nmaps > MAX_NUM_MAPS) nmaps = MAX_NUM_MAPS; // Determine the number of threads. const char* env_nthds = std::getenv("NUM_THREADS"); int ncpus = std::thread::hardware_concurrency(); int nthds = (env_nthds && strlen(env_nthds)) ? (::strcmp(env_nthds, "max") == 0) ? ncpus : ::atoi(env_nthds) : ncpus; if (nthds <= 0) nthds = ncpus; omp_set_dynamic(false); omp_set_num_threads(nthds); #else int nmaps = 1; int nthds = 1; #endif std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fix +ed); #ifdef MAX_STR_LEN_L std::cerr << "llil4tkh (fixed string length=" << MAX_STR_LEN_L; #else std::cerr << "llil4tkh (fixed string length=unlimited"; #endif std::cerr << ", threads=" << nthds << ", maps=" << nmaps << ") star +t\n"; std::cerr << "sharding managed by the application\n"; #ifdef _OPENMP std::cerr << "use OpenMP\n"; #else std::cerr << "don't use OpenMP\n"; #endif #if USE_BOOST_PARALLEL_SORT == 0 std::cerr << "don't use boost sort\n"; #else std::cerr << "use boost sort\n"; #endif time_point cstart1, cend1, cstart2, cend2, cstart3, cend3s, cend3; cstart1 = high_resolution_clock::now(); // Get the list of input files from the command line. int nfiles = argc - 1; char** fname = &argv[1]; char path[255]; // Sharding is managed by the application. std::array<tkrzw::HashDBM, MAX_NUM_MAPS> dbm; for (int i = 0; i < nmaps; ++i) { std::sprintf(path, "%s/llil.tkh-%05d-of-%05d", tmpdir.c_str(), i +, nmaps); dbm[i].Open( path, true, tkrzw::File::OPEN_DEFAULT | tkrzw::File::OPEN_TRUNCATE ).OrDie(); } // Load properties into the DB. int64_t num_lines = 0; for (int i = 0; i < nfiles; ++i) num_lines += get_properties(fname[i], nthds, nmaps, dbm); int64_t num_keys = 0; for (int i = 0; i < nmaps; ++i) num_keys += dbm[i].CountSimple(); cend1 = high_resolution_clock::now(); double ctaken1 = elaspe_time(cend1, cstart1); std::cerr << "get properties " << std::setw(8) << ctaken1 << " + secs\n"; if (num_keys <= 0) { std::cerr << "No work, exiting...\n"; return 1; } cstart2 = high_resolution_clock::now(); // Store the properties into a vector vec_str_int_type propvec; propvec.reserve(num_keys); #pragma omp parallel for schedule(static, 1) for (int i = 0; i < nmaps; ++i) { int64_t num_keys = dbm[i].CountSimple(); if (num_keys > 0) { vec_str_int_type locvec; locvec.reserve(num_keys); #ifdef MAX_STR_LEN_L str_type str; #endif std::string key, value; std::unique_ptr<tkrzw::DBM::Iterator> iter = dbm[i].MakeItera +tor(); iter->First(); while (iter->Get(&key, &value) == tkrzw::Status::SUCCESS) { #ifdef MAX_STR_LEN_L ::memset(str.data(), '\0', str.size()); ::memcpy(str.data(), key.data(), key.size()); locvec.emplace_back(str, tkrzw::StrToIntBigEndian(value)); #else locvec.emplace_back(key, tkrzw::StrToIntBigEndian(value)); #endif iter->Next(); } #pragma omp critical propvec.insert( // Append local vector to propvec propvec.end(), std::make_move_iterator(locvec.begin()), std::make_move_iterator(locvec.end()) ); } dbm[i].Close(); } // Remove the temp DB files. for (int i = 0; i < nmaps; ++i) { std::sprintf(path, "%s/llil.tkh-%05d-of-%05d", tmpdir.c_str(), i +, nmaps); fs::remove(path); } cend2 = high_resolution_clock::now(); double ctaken2 = elaspe_time(cend2, cstart2); std::cerr << "hashDBMs to vector " << std::setw(8) << ctaken2 << " + secs\n"; cstart3 = high_resolution_clock::now(); // Sort the vector by (count) in reverse order, (name) in lexical o +rder auto reverse_order = [](const str_int_type& left, const str_int_typ +e& right) { return left.second != right.second ? left.second > right.second : left.first < right.first; }; #if USE_BOOST_PARALLEL_SORT == 0 // Standard sort std::sort(propvec.begin(), propvec.end(), reverse_order); #else // Parallel sort boost::sort::block_indirect_sort( propvec.begin(), propvec.end(), reverse_order, #ifdef __NVCOMPILER_LLVM__ std::min(nthds, 32) #else nthds #endif ); #endif cend3s = high_resolution_clock::now(); // Output the sorted vector out_properties(nthds, propvec); cend3 = high_resolution_clock::now(); double ctaken = elaspe_time(cend3, cstart1); double ctaken3s = elaspe_time(cend3s, cstart3); double ctaken3o = elaspe_time(cend3, cend3s); std::cerr << "vector stable sort " << std::setw(8) << ctaken3s << +" secs\n"; std::cerr << "write stdout " << std::setw(8) << ctaken3o << +" secs\n"; std::cerr << "total time " << std::setw(8) << ctaken << +" secs\n"; std::cerr << " count lines " << num_lines << "\n"; std::cerr << " count unique " << num_keys << "\n"; // Hack to see Private Bytes in Windows Task Manager // (uncomment next line so process doesn't exit too quickly) // std::this_thread::sleep_for(milliseconds(9000)); return 0; }

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11153433]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others wandering the Monastery: (2)
As of 2025-02-09 14:43 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Which URL do you most often use to access this site?












    Results (96 votes). Check out past polls.