I created another Tkrzw demonstration. This one constructs many HashDBMs. Basically, sharding is managed by the application.
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~
// llil4tkh2.cc
// A tkrzw::HashDBM demonstration.
// https://www.perlmonks.com/?node_id=11149643
//
// April 25, 2024
// Based on llil3m.cpp https://perlmonks.com/?node_id=11149482
// Original challenge https://perlmonks.com/?node_id=11147822
// and summary https://perlmonks.com/?node_id=11150293
// Other demonstrations https://perlmonks.com/?node_id=11149907
//
// Authors
// Mario Roy - C++ demonstration with parallel capabilities
// eyepopslikeamosquito - Co-author, learning C++ at PerlMonks.com
//
// See also, memory efficient variant
// https://gist.github.com/marioroy/d02881b96b20fa1adde4388b3e21616
+3
//
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~
// OpenMP Little Book - https://nanxiao.gitbooks.io/openmp-little-book
//
// On macOS, use g++-12 from https://brew.sh (installation: brew insta
+ll gcc@12).
//
// This demonstration requires the Tkrzw C++ library.
// - homepage https://dbmx.net/tkrzw/
// - C++ source https://dbmx.net/tkrzw/pkg/
// - documentation https://dbmx.net/tkrzw/api/
// - GitHub page https://github.com/estraier/tkrzw
// - installation:
// on Linux: ./configure
// on macOS: CC=gcc-12 CXX=g++-12 ./configure
// make
// make install # Note: you may need to use "sudo"
//
// The Tkrzw bundled command "tkrzw_build_util" is useful to know the
+include and LDFLAGS.
// tkrzw_build_util config -i
// tkrzw_build_util config -l
//
// Compile on Linux (clang++ or g++):
// clang++ -o llil4tkh2 -std=c++20 -fopenmp -Wall -O3 llil4tkh2.cc
+-I/usr/local/include -L/usr/local/lib -ltkrzw -lstdc++ -lrt -latomic
+-lpthread -lm -lc
//
// Compile on macOS (g++-12):
// g++-12 -o llil4tkh2 -std=c++20 -fopenmp -Wall -O3 llil4tkh2.cc -
+I/usr/local/include -L/usr/local/lib -ltkrzw -lstdc++ -latomic -lpthr
+ead -lm -lc
//
// Obtain gen-llil.pl and gen-long-llil.pl from https://perlmonks.com/
+?node_id=11148681
// perl gen-llil.pl big1.txt 200 3 1
// perl gen-llil.pl big2.txt 200 3 1
// perl gen-llil.pl big3.txt 200 3 1
//
// To make random input, obtain shuffle.pl from https://perlmonks.com/
+?node_id=11149800
// perl shuffle.pl big1.txt >tmp && mv tmp big1.txt
// perl shuffle.pl big2.txt >tmp && mv tmp big2.txt
// perl shuffle.pl big3.txt >tmp && mv tmp big3.txt
//
// Example run: llil4tkh2 big1.txt big2.txt big3.txt >out.txt
// TMPDIR=/dev/shm NUM_THREADS=8 NUM_MAPS=32 llil4tkh2 ...
// TMPDIR=/dev/shm NUM_THREADS=max NUM_MAPS=max llil4tkh2 ...
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~
#include <cassert>
#include <cstdio>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <compare>
#include <chrono>
#include <string>
#include <string_view>
#include <array>
#include <vector>
#include <thread>
#include <execution>
#include <atomic>
#include <iomanip>
#include <iostream>
#include <fstream>
#include <unistd.h>
#include <filesystem>
namespace fs = std::filesystem;
#include <tkrzw_dbm_common_impl.h>
#include <tkrzw_dbm_hash.h>
#include <tkrzw_str_util.h>
static_assert(sizeof(size_t) == sizeof(int64_t), "size_t too small, ne
+ed a 64-bit compile");
// Specify 0/1 to use boost's parallel sorting algorithm; faster than
+__gnu_parallel::sort.
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/html/sort/paral
+lel.html
// https://www.boost.org/doc/libs/1_85_0/libs/sort/doc/papers/block_in
+direct_sort_en.pdf
// This requires the boost header files: e.g. devpkg-boost bundle on C
+lear Linux.
// Note: Another option is downloading and unpacking Boost locally.
// (no need to build it because the bits we use are header file only)
#define USE_BOOST_PARALLEL_SORT 1
#if USE_BOOST_PARALLEL_SORT
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wshadow"
#include <boost/sort/sort.hpp>
#pragma clang diagnostic pop
#else
#include <boost/sort/sort.hpp>
#endif
#endif
#ifdef _OPENMP
#include <omp.h>
#endif
// -------------------------------------------------------------------
+---------
typedef int64_t int_type;
// All words in big1.txt, big2.txt, big3.txt are <= 6 chars in length.
// big.txt max word length is 6
// long.txt max word length is 208
//
// Based on rough benchmarking, the short fixed string hack below is o
+nly
// worth trying for MAX_STR_LEN_L up to about 30.
// See also https://backlinko.com/google-keyword-study
//
// To use (limited length) fixed length strings uncomment the next lin
+e.
#define MAX_STR_LEN_L (size_t) 12
#ifdef MAX_STR_LEN_L
struct str_type : std::array<char, MAX_STR_LEN_L> {
bool operator==( const str_type& o ) const {
return ::memcmp(this->data(), o.data(), MAX_STR_LEN_L) == 0;
}
bool operator<( const str_type& o ) const {
return ::memcmp(this->data(), o.data(), MAX_STR_LEN_L) < 0;
}
};
#else
using str_type = std::basic_string<char>;
#endif
using str_int_type = std::pair<str_type, int_type>;
using vec_str_int_type = std::vector<str_int_type>;
// Mimic the Perl get_properties subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~
// convert positive number from string to uint32_t
inline uint32_t fast_atoll64(const char* str)
{
uint32_t val = 0;
uint8_t digit;
while ((digit = uint8_t(*str++ - '0')) <= 9)
val = val * 10 + digit;
return val;
}
// Helper function to find a character.
inline char* find_char(char* first, char* last, char c)
{
while (first != last) {
if (*first == c) break;
++first;
}
return first;
}
// Limit line length and chunk size.
inline constexpr size_t MAX_LINE_LEN = 255;
inline constexpr size_t CHUNK_SIZE = 32768;
inline constexpr int MAX_NUM_MAPS = 255;
static int64_t get_properties(
const char* fname, // in : the input file name
const int nthds, // in : the number of threads
const int nmaps, // in : the number of maps (or shar
+ds)
auto& dbm_ret) // inout: the dbm to be updated
{
int64_t num_lines = 0;
std::ifstream fin(fname, std::ifstream::binary);
if (!fin.is_open()) {
std::cerr << "Error opening '" << fname << "' : " << strerror(er
+rno) << '\n';
return num_lines;
}
#pragma omp parallel reduction(+:num_lines)
{
std::string buf;
buf.resize(CHUNK_SIZE + MAX_LINE_LEN + 1, '\0');
while (fin.good()) {
size_t len = 0;
// Read the next chunk serially.
#pragma omp critical
{
fin.read(&buf[0], CHUNK_SIZE);
if ((len = fin.gcount()) > 0) {
if (buf[len - 1] != '\n' && fin.getline(&buf[len], MAX_
+LINE_LEN)) {
// Getline discards the newline char and appends nul
+l char.
// Therefore, change '\0' to '\n'.
len += fin.gcount();
buf[len - 1] = '\n';
}
}
}
if (!len)
break;
buf[len] = '\0';
char *first = &buf[0];
char *last = &buf[len];
// Process max Nthreads chunks concurrently.
while (first < last) {
char* beg_ptr{first};
char* end_ptr{find_char(first, last, '\n')};
char* found = find_char(beg_ptr, end_ptr, '\t');
first = end_ptr + 1;
if (found == end_ptr)
continue;
assert(*found == '\t');
int_type count = fast_atoll64(found + 1);
#ifdef MAX_STR_LEN_L
size_t klen = std::min(MAX_STR_LEN_L, (size_t)(found - beg
+_ptr));
#else
size_t klen = found - beg_ptr;
#endif
std::basic_string_view<char> key {
reinterpret_cast<const char*>(beg_ptr), klen };
size_t idx = tkrzw::SecondaryHash(key, nmaps);
dbm_ret[idx].IncrementSimple(key, count);
++num_lines;
}
}
}
fin.close();
// std::cerr << "getprops done\n";
return num_lines;
}
// Output subroutine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~
size_t divide_up(size_t dividend, size_t divisor)
{
if (dividend % divisor)
return (size_t)(dividend / divisor) + 1;
else
return (size_t)(dividend / divisor);
}
static void out_properties(
const int nthds, // in : the number of threads
vec_str_int_type& vec) // in : the vector to output
{
size_t num_chunks = divide_up(vec.size(), CHUNK_SIZE);
int nthds_out = 1;
#ifdef _OPENMP
nthds_out = std::min(nthds, 32);
#endif
#pragma omp parallel for ordered schedule(static, 1) num_threads(nt
+hds_out)
for (size_t chunk_id = 1; chunk_id <= num_chunks; ++chunk_id) {
std::string str(""); str.reserve(2048 * 1024);
auto it = vec.begin() + (chunk_id - 1) * CHUNK_SIZE;
auto it2 = vec.begin() + std::min(vec.size(), chunk_id * CHUNK_S
+IZE);
for (; it != it2; ++it) {
#ifdef MAX_STR_LEN_L
str.append(it->first.data());
#else
str.append(it->first.data(), it->first.size());
#endif
str.append("\t", 1);
str.append(std::to_string(it->second));
str.append("\n", 1);
}
#pragma omp ordered
std::cout << str << std::flush;
}
}
typedef std::chrono::high_resolution_clock high_resolution_clock;
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds milliseconds;
double elaspe_time(time_point cend, time_point cstart) {
return double (
std::chrono::duration_cast<milliseconds>(cend - cstart).count()
) * 1e-3;
}
// -------------------------------------------------------------------
+--
int main(int argc, char* argv[])
{
if (argc < 2) {
if (argc > 0)
std::cerr << "usage: llil4tkh2 file1 file2 ... >out.txt\n";
return 1;
}
// Determine the temp dir to use.
fs::path tmpdir = fs::temp_directory_path();
const char* env_tmpdir = std::getenv("TMPDIR");
if (env_tmpdir && strlen(env_tmpdir))
tmpdir.assign(env_tmpdir);
else if (fs::is_directory("/dev/shm") && !access("/dev/shm", W_OK))
tmpdir.assign("/dev/shm");
#ifdef _OPENMP
// Determine the number of maps (or shards).
const char* env_nmaps = std::getenv("NUM_MAPS");
int nmaps = (env_nmaps && strlen(env_nmaps))
? (::strcmp(env_nmaps, "max") == 0) ? MAX_NUM_MAPS : ::atoi(env_
+nmaps)
: 32;
if (nmaps <= 0) nmaps = 32;
if (nmaps > MAX_NUM_MAPS) nmaps = MAX_NUM_MAPS;
// Determine the number of threads.
const char* env_nthds = std::getenv("NUM_THREADS");
int ncpus = std::thread::hardware_concurrency();
int nthds = (env_nthds && strlen(env_nthds))
? (::strcmp(env_nthds, "max") == 0) ? ncpus : ::atoi(env_nthds)
: ncpus;
if (nthds <= 0) nthds = ncpus;
omp_set_dynamic(false);
omp_set_num_threads(nthds);
#else
int nmaps = 1;
int nthds = 1;
#endif
std::cerr << std::setprecision(3) << std::setiosflags(std::ios::fix
+ed);
#ifdef MAX_STR_LEN_L
std::cerr << "llil4tkh (fixed string length=" << MAX_STR_LEN_L;
#else
std::cerr << "llil4tkh (fixed string length=unlimited";
#endif
std::cerr << ", threads=" << nthds << ", maps=" << nmaps << ") star
+t\n";
std::cerr << "sharding managed by the application\n";
#ifdef _OPENMP
std::cerr << "use OpenMP\n";
#else
std::cerr << "don't use OpenMP\n";
#endif
#if USE_BOOST_PARALLEL_SORT == 0
std::cerr << "don't use boost sort\n";
#else
std::cerr << "use boost sort\n";
#endif
time_point cstart1, cend1, cstart2, cend2, cstart3, cend3s, cend3;
cstart1 = high_resolution_clock::now();
// Get the list of input files from the command line.
int nfiles = argc - 1;
char** fname = &argv[1];
char path[255];
// Sharding is managed by the application.
std::array<tkrzw::HashDBM, MAX_NUM_MAPS> dbm;
for (int i = 0; i < nmaps; ++i) {
std::sprintf(path, "%s/llil.tkh-%05d-of-%05d", tmpdir.c_str(), i
+, nmaps);
dbm[i].Open(
path, true,
tkrzw::File::OPEN_DEFAULT | tkrzw::File::OPEN_TRUNCATE
).OrDie();
}
// Load properties into the DB.
int64_t num_lines = 0;
for (int i = 0; i < nfiles; ++i)
num_lines += get_properties(fname[i], nthds, nmaps, dbm);
int64_t num_keys = 0;
for (int i = 0; i < nmaps; ++i)
num_keys += dbm[i].CountSimple();
cend1 = high_resolution_clock::now();
double ctaken1 = elaspe_time(cend1, cstart1);
std::cerr << "get properties " << std::setw(8) << ctaken1 << "
+ secs\n";
if (num_keys <= 0) {
std::cerr << "No work, exiting...\n";
return 1;
}
cstart2 = high_resolution_clock::now();
// Store the properties into a vector
vec_str_int_type propvec; propvec.reserve(num_keys);
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < nmaps; ++i) {
int64_t num_keys = dbm[i].CountSimple();
if (num_keys > 0) {
vec_str_int_type locvec; locvec.reserve(num_keys);
#ifdef MAX_STR_LEN_L
str_type str;
#endif
std::string key, value;
std::unique_ptr<tkrzw::DBM::Iterator> iter = dbm[i].MakeItera
+tor();
iter->First();
while (iter->Get(&key, &value) == tkrzw::Status::SUCCESS) {
#ifdef MAX_STR_LEN_L
::memset(str.data(), '\0', str.size());
::memcpy(str.data(), key.data(), key.size());
locvec.emplace_back(str, tkrzw::StrToIntBigEndian(value));
#else
locvec.emplace_back(key, tkrzw::StrToIntBigEndian(value));
#endif
iter->Next();
}
#pragma omp critical
propvec.insert(
// Append local vector to propvec
propvec.end(),
std::make_move_iterator(locvec.begin()),
std::make_move_iterator(locvec.end())
);
}
dbm[i].Close();
}
// Remove the temp DB files.
for (int i = 0; i < nmaps; ++i) {
std::sprintf(path, "%s/llil.tkh-%05d-of-%05d", tmpdir.c_str(), i
+, nmaps);
fs::remove(path);
}
cend2 = high_resolution_clock::now();
double ctaken2 = elaspe_time(cend2, cstart2);
std::cerr << "hashDBMs to vector " << std::setw(8) << ctaken2 << "
+ secs\n";
cstart3 = high_resolution_clock::now();
// Sort the vector by (count) in reverse order, (name) in lexical o
+rder
auto reverse_order = [](const str_int_type& left, const str_int_typ
+e& right) {
return left.second != right.second
? left.second > right.second
: left.first < right.first;
};
#if USE_BOOST_PARALLEL_SORT == 0
// Standard sort
std::sort(propvec.begin(), propvec.end(), reverse_order);
#else
// Parallel sort
boost::sort::block_indirect_sort(
propvec.begin(), propvec.end(), reverse_order,
#ifdef __NVCOMPILER_LLVM__
std::min(nthds, 32)
#else
nthds
#endif
);
#endif
cend3s = high_resolution_clock::now();
// Output the sorted vector
out_properties(nthds, propvec);
cend3 = high_resolution_clock::now();
double ctaken = elaspe_time(cend3, cstart1);
double ctaken3s = elaspe_time(cend3s, cstart3);
double ctaken3o = elaspe_time(cend3, cend3s);
std::cerr << "vector stable sort " << std::setw(8) << ctaken3s <<
+" secs\n";
std::cerr << "write stdout " << std::setw(8) << ctaken3o <<
+" secs\n";
std::cerr << "total time " << std::setw(8) << ctaken <<
+" secs\n";
std::cerr << " count lines " << num_lines << "\n";
std::cerr << " count unique " << num_keys << "\n";
// Hack to see Private Bytes in Windows Task Manager
// (uncomment next line so process doesn't exit too quickly)
// std::this_thread::sleep_for(milliseconds(9000));
return 0;
}