Skip to content

Commit

Permalink
Finish implementing cli args and fix bip::vector unpack #3084
Browse files Browse the repository at this point in the history
  • Loading branch information
mvandeberg committed Sep 30, 2019
1 parent d824c29 commit 6121f2d
Show file tree
Hide file tree
Showing 14 changed files with 319 additions and 289 deletions.
8 changes: 8 additions & 0 deletions libraries/appbase/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,12 @@ const variables_map& application::get_args() const
return my->_args;
}

void application::for_each_plugin( std::function< void(const abstract_plugin&) > cb ) const
{
for( auto& p : plugins )
{
cb( *(p.second) );
}
}

} /// namespace appbase
2 changes: 2 additions & 0 deletions libraries/appbase/include/appbase/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ namespace appbase {

boost::asio::io_service& get_io_service() { return *io_serv; }

void for_each_plugin( std::function< void(const abstract_plugin&) > cb ) const;

protected:
template< typename Impl >
friend class plugin;
Expand Down
1 change: 1 addition & 0 deletions libraries/appbase/include/appbase/plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace appbase {
virtual void initialize(const variables_map& options) = 0;
virtual void startup() = 0;
virtual void shutdown() = 0;
virtual void get_impacted_options( boost::any& cfg )const {}

protected:
typedef std::function<void(abstract_plugin&)> plugin_processor;
Expand Down
4 changes: 2 additions & 2 deletions libraries/chain/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ uint32_t database::reindex( const open_args& args )

auto last_block_num = _block_log.head()->block_num();

if( args.stop_replay_at > 0 && args.stop_replay_at < last_block_num )
last_block_num = args.stop_replay_at;
if( args.stop_at_block > 0 && args.stop_at_block < last_block_num )
last_block_num = args.stop_at_block;

if( head_block_num() < last_block_num )
{
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/steem/chain/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ namespace steem { namespace chain {
std::shared_ptr< std::function< void( database&, const open_args& ) > > genesis_func;

// The following fields are only used on reindexing
uint32_t stop_replay_at = 0;
uint32_t stop_at_block = 0;
TBenchmark benchmark = TBenchmark(0, []( uint32_t, const abstract_index_cntr_t& ){});
};

Expand Down
10 changes: 6 additions & 4 deletions libraries/chain/include/steem/chain/statefile/statefile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ struct steem_version_info
std::map< std::string, fc::variant > config;
std::string chain_id;
int32_t head_block_num;

//bool compatible()
};

struct object_section
Expand All @@ -59,8 +57,9 @@ typedef fc::static_variant< object_section > section_header;

struct state_header
{
steem_version_info version;
std::vector< section_header > sections;
steem_version_info version;
std::map< std::string, std::string > plugin_options;
std::vector< section_header > sections;
};

struct section_footer
Expand Down Expand Up @@ -90,6 +89,8 @@ struct state_format_info
write_state_result write_state( const database& db, const std::string& state_filename, const state_format_info& state_format );
void init_genesis_from_state( database& db, const std::string& state_filename, const boost::filesystem::path& p, const boost::any& cfg );

void fill_plugin_options( fc::map< std::string, std::string >& plugin_options );

} } }

FC_REFLECT( steem::chain::statefile::steem_version_info,
Expand All @@ -103,6 +104,7 @@ FC_REFLECT( steem::chain::statefile::steem_version_info,

FC_REFLECT( steem::chain::statefile::state_header,
(version)
(plugin_options)
(sections)
)

Expand Down
275 changes: 146 additions & 129 deletions libraries/chain/statefile/load_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,147 +10,164 @@ namespace steem { namespace chain { namespace statefile {

void init_genesis_from_state( database& db, const std::string& state_filename, const boost::filesystem::path& p, const boost::any& cfg )
{
std::ifstream input_stream( state_filename, std::ios::binary );

std::stringbuf top_header_buf;
input_stream.get( top_header_buf );
state_header top_header = fc::json::from_string( top_header_buf.str() ).as< state_header >();
steem_version_info expected_version = steem_version_info( db );

FC_ASSERT( top_header.version.db_format_version == expected_version.db_format_version, "DB Format Version mismatch" );
FC_ASSERT( top_header.version.network_type == expected_version.network_type, "Network Type mismatch" );
FC_ASSERT( top_header.version.chain_id == expected_version.chain_id, "Chain ID mismatch" );

db.set_revision( top_header.version.head_block_num );

flat_map< std::string, std::shared_ptr< index_info > > index_map;
db.for_each_index_extension< index_info >( [&]( std::shared_ptr< index_info > info )
{
std::string name;
info->get_schema()->get_name( name );
index_map[ name ] = info;
});

flat_map< std::string, section_header > header_map;
for( const auto& header : top_header.sections )
{
object_section obj_header = header.get< object_section >();
header_map.insert_or_assign( obj_header.object_type, std::move( obj_header ) );
}

for( const auto& idx : index_map )
{
auto itr = header_map.find( idx.first );
FC_ASSERT( itr != header_map.end(), "Did not find expected object index: ${o}", ("o", idx.first) );

std::string expected_schema;
idx.second->get_schema()->get_str_schema( expected_schema );

FC_TODO( "Version object data to allow for upgrading during load" );
FC_ASSERT( expected_schema == itr->second.get< object_section >().schema,
"Unexpected incoming schema for object ${o}.\nExpected: ${e}\nActual: ${a}",
("o", idx.first)("e", expected_schema)("a", itr->second.get< object_section >().schema) );
}

input_stream.seekg( -sizeof( int64_t ), std::ios::end );
int64_t footer_pos;
input_stream.read( (char*)&footer_pos, sizeof( int64_t ) );

input_stream.seekg( footer_pos );
std::stringbuf footer_buf;
input_stream.get( footer_buf );

state_footer top_footer = fc::json::from_string( footer_buf.str() ).as< state_footer >();
flat_map< std::string, section_footer > footer_map;

for( size_t i = 0; i < top_footer.section_footers.size(); i++ )
{
std::string& name = top_header.sections[ i ].get< object_section >().object_type;
footer_map[ name ] = top_footer.section_footers[i];
}

char buffer[ 1024 ];
fc::sha256::encoder enc;

for( const auto& i : index_map )
{
vector< char > section_header_bin;
char c;

object_section header = header_map[ i.first ].get< object_section >();

input_stream.seekg( footer_map[ i.first ].begin_offset );
int64_t begin = input_stream.tellg();

std::stringbuf header_buf;
input_stream.get( header_buf );
object_section s_header = fc::json::from_string( header_buf.str() ).as< section_header >().get< object_section >();
input_stream.read( &c, 1 );

FC_ASSERT( header.object_type == s_header.object_type, "Expected next object type: ${e} actual: ${a}",
("e", header.object_type)("a", s_header.object_type) );
FC_ASSERT( header.format == s_header.format, "Mismatched object format for ${o}.",
("o", header.object_type) );
FC_ASSERT( header.object_count == s_header.object_count, "Mismatched object count for ${o}.",
("o", header.object_type) );
FC_ASSERT( header.schema == s_header.schema, "Mismatched object schema for ${o}.",
("o", header.object_type) );

ilog( "Unpacking ${o}. (${n} Objects)", ("o", header.object_type)("n", header.object_count) );
std::shared_ptr< index_info > idx = index_map[ header.object_type ];

#ifdef ENABLE_MIRA
idx->set_index_type( db, mira::index_type::bmic, p, cfg );
#endif

for( int64_t i = 0; i < header.object_count; i++ )
try {
std::ifstream input_stream( state_filename, std::ios::binary );

std::stringbuf top_header_buf;
input_stream.get( top_header_buf );
state_header top_header = fc::json::from_string( top_header_buf.str() ).as< state_header >();
steem_version_info expected_version = steem_version_info( db );

ilog( "Loading blockchain state from file. Head Block: ${n}", ("n", top_header.version.head_block_num) );

FC_ASSERT( top_header.version.db_format_version == expected_version.db_format_version, "DB Format Version mismatch" );
FC_ASSERT( top_header.version.network_type == expected_version.network_type, "Network Type mismatch" );
FC_ASSERT( top_header.version.chain_id == expected_version.chain_id, "Chain ID mismatch" );

db.set_revision( top_header.version.head_block_num );

flat_map< std::string, std::shared_ptr< index_info > > index_map;
db.for_each_index_extension< index_info >( [&]( std::shared_ptr< index_info > info )
{
if( header.format == FORMAT_BINARY )
{
idx->create_object_from_binary( db, input_stream );
}
else if( header.format == FORMAT_JSON )
{
std::stringbuf object_stream;
input_stream.get( object_stream );
idx->create_object_from_json( db, object_stream.str() );
input_stream.read( &c, 1 );
}
std::string name;
info->get_schema()->get_name( name );
index_map[ name ] = info;
});

flat_map< std::string, section_header > header_map;
for( const auto& header : top_header.sections )
{
object_section obj_header = header.get< object_section >();
header_map.insert_or_assign( obj_header.object_type, std::move( obj_header ) );
}

FC_TODO( "Put index conversion in parallel thread" )
#ifdef ENABLE_MIRA
idx->set_index_type( db, mira::index_type::mira, p, cfg );
#endif
idx->set_next_id( db, header.next_id );
for( const auto& idx : index_map )
{
auto itr = header_map.find( idx.first );
FC_ASSERT( itr != header_map.end(), "Did not find expected object index: ${o}", ("o", idx.first) );

std::string expected_schema;
idx.second->get_schema()->get_str_schema( expected_schema );

FC_TODO( "Version object data to allow for upgrading during load" );
FC_ASSERT( expected_schema == itr->second.get< object_section >().schema,
"Unexpected incoming schema for object ${o}.\nExpected: ${e}\nActual: ${a}",
("o", idx.first)("e", expected_schema)("a", itr->second.get< object_section >().schema) );
}

std::map< std::string, std::string > expected_plugin_options;
fill_plugin_options( expected_plugin_options );

int64_t end = input_stream.tellg();
for( auto& plugin_opt : expected_plugin_options )
{
auto itr = top_header.plugin_options.find( plugin_opt.first );
FC_ASSERT( itr != top_header.plugin_options.end(), "Did not find expected options for plugin: ${p}",
("p", plugin_opt.first) );
FC_ASSERT( plugin_opt.second == itr->second, "Plugin option mismatch for plugin: ${p}.\nExpected: ${e}\nActual: ${a}",
("p", plugin_opt.first)("e", plugin_opt.second)("a", itr->second) );
}

input_stream.seekg( -sizeof( int64_t ), std::ios::end );
int64_t footer_pos;
input_stream.read( (char*)&footer_pos, sizeof( int64_t ) );

input_stream.seekg( footer_pos );
std::stringbuf footer_buf;
input_stream.get( footer_buf );
section_footer s_footer = fc::json::from_string( footer_buf.str() ).as< section_footer >();

FC_ASSERT( s_footer.begin_offset == begin, "Begin offset mismatch for ${o}",
("o", header.object_type) );
FC_ASSERT( s_footer.end_offset == end, "Begin offset mismatch for ${o}",
("o", header.object_type) );
state_footer top_footer = fc::json::from_string( footer_buf.str() ).as< state_footer >();
flat_map< std::string, section_footer > footer_map;

int64_t total = end - begin;
int64_t read = 0;
input_stream.seekg( begin );
enc.reset();

while( read < total )
for( size_t i = 0; i < top_footer.section_footers.size(); i++ )
{
int64_t to_read = std::min( total - read, (int64_t)1024 );
input_stream.read( buffer, to_read );
read += 1024;
enc.write( buffer, to_read );
std::string& name = top_header.sections[ i ].get< object_section >().object_type;
footer_map[ name ] = top_footer.section_footers[i];
}

FC_ASSERT( s_footer.hash == SHA256_PREFIX + enc.result().str(), "Incorrect hash for ${o}. Expectd: ${e} Actual: ${a}",
("o", header.object_type)("e", s_footer.hash)("a", SHA256_PREFIX + enc.result().str()) );
}
char buffer[ 1024 ];
fc::sha256::encoder enc;

for( const auto& i : index_map )
{
vector< char > section_header_bin;
char c;

object_section header = header_map[ i.first ].get< object_section >();

input_stream.seekg( footer_map[ i.first ].begin_offset );
int64_t begin = input_stream.tellg();

std::stringbuf header_buf;
input_stream.get( header_buf );
object_section s_header = fc::json::from_string( header_buf.str() ).as< section_header >().get< object_section >();
input_stream.read( &c, 1 );

FC_ASSERT( header.object_type == s_header.object_type, "Expected next object type: ${e} actual: ${a}",
("e", header.object_type)("a", s_header.object_type) );
FC_ASSERT( header.format == s_header.format, "Mismatched object format for ${o}.",
("o", header.object_type) );
FC_ASSERT( header.object_count == s_header.object_count, "Mismatched object count for ${o}.",
("o", header.object_type) );
FC_ASSERT( header.schema == s_header.schema, "Mismatched object schema for ${o}.",
("o", header.object_type) );

ilog( "Unpacking ${o}. (${n} Objects)", ("o", header.object_type)("n", header.object_count) );
std::shared_ptr< index_info > idx = index_map[ header.object_type ];

#ifdef ENABLE_MIRA
idx->set_index_type( db, mira::index_type::bmic, p, cfg );
#endif

for( int64_t i = 0; i < header.object_count; i++ )
{
if( header.format == FORMAT_BINARY )
{
idx->create_object_from_binary( db, input_stream );
}
else if( header.format == FORMAT_JSON )
{
std::stringbuf object_stream;
input_stream.get( object_stream );
idx->create_object_from_json( db, object_stream.str() );
input_stream.read( &c, 1 );
}
}

FC_TODO( "Put index conversion in parallel thread" )
#ifdef ENABLE_MIRA
idx->set_index_type( db, mira::index_type::mira, p, cfg );
#endif

idx->set_next_id( db, header.next_id );

int64_t end = input_stream.tellg();

std::stringbuf footer_buf;
input_stream.get( footer_buf );
section_footer s_footer = fc::json::from_string( footer_buf.str() ).as< section_footer >();

FC_ASSERT( s_footer.begin_offset == begin, "Begin offset mismatch for ${o}",
("o", header.object_type) );
FC_ASSERT( s_footer.end_offset == end, "Begin offset mismatch for ${o}",
("o", header.object_type) );

int64_t total = end - begin;
int64_t read = 0;
input_stream.seekg( begin );
enc.reset();

while( read < total )
{
int64_t to_read = std::min( total - read, (int64_t)1024 );
input_stream.read( buffer, to_read );
read += 1024;
enc.write( buffer, to_read );
}

FC_ASSERT( s_footer.hash == SHA256_PREFIX + enc.result().str(), "Incorrect hash for ${o}. Expectd: ${e} Actual: ${a}",
("o", header.object_type)("e", s_footer.hash)("a", SHA256_PREFIX + enc.result().str()) );
}
} FC_LOG_AND_RETHROW()
}

} } } // steem::chain::statefile
3 changes: 3 additions & 0 deletions libraries/chain/statefile/save_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ write_state_result write_state( const database& db, const std::string& state_fil

object_serializer ser( state_format );
ser.start_threads();
// Grab plugin options
fill_plugin_options( top_header.plugin_options );

// Grab the object sections
db.for_each_index_extension< index_info >(
[&]( std::shared_ptr< index_info > info )
Expand Down
Loading

0 comments on commit 6121f2d

Please sign in to comment.