Skip to content
Merged
18 changes: 18 additions & 0 deletions tests/bloom_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,22 @@ TEST(BloomFilterTests, BloomFilterApplicationLogic) {
EXPECT_TRUE(found_20); // Inserted value must be found
}

// Test: BloomFilter with corrupted/too-small serialization data
// Line 62-64: size < sizeof(uint64_t)*3+1 triggers early return, filter becomes inert
TEST(BloomFilterTests, CorruptedSerialization_InertNoCrash) {
// Minimum valid requires: 3*8 bytes (headers) + 1 byte (bits) = 25 bytes
// Use exactly 25 bytes but with garbage values so bit_bytes validation fails
std::vector<uint8_t> data(25, 0);
data[0] = 0xFF; // num_bits = very large, will fail validation
data[8] = 0xFF; // num_hashes = very large
data[16] = 0xFF; // expected = very large

// Constructor from serialized data - invalid bit_bytes triggers early return
BloomFilter bf(data.data(), data.size());

// Filter should be inert - might_contain always returns false
Value v = Value::make_int64(42);
EXPECT_FALSE(bf.might_contain(v)); // No crash, returns false
}

} // namespace
21 changes: 21 additions & 0 deletions tests/buffer_pool_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,25 @@ TEST(BufferPoolTests, FetchPageReadFailure) {
static_cast<void>(std::remove(short_file.c_str()));
}

// Test: Double unpin returns false
// Line 130: unpin returns false when pin_count_ is already zero
TEST(BufferPoolTests, DoubleUnpin_ReturnsFalse) {
static_cast<void>(std::remove("./test_data/double_unpin.db"));
StorageManager disk_manager("./test_data");
BufferPoolManager bpm(2, disk_manager);

const std::string file_name = "double_unpin.db";
uint32_t page_id = 0;
Page* page = bpm.new_page(file_name, &page_id);
ASSERT_NE(page, nullptr);

// First unpin - should succeed
EXPECT_TRUE(bpm.unpin_page(file_name, page_id, true));

// Second unpin on same page - pin_count_ already 0, should return false
EXPECT_FALSE(bpm.unpin_page(file_name, page_id, true));

static_cast<void>(std::remove("./test_data/double_unpin.db"));
}

} // namespace
26 changes: 26 additions & 0 deletions tests/columnar_table_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,30 @@ TEST_F(ColumnarTableTests, SchemaAccessor) {
ASSERT_EQ(retrieved_schema.get_column(1).type(), common::ValueType::TYPE_FLOAT64);
}

// Test: read_batch with start_row beyond table rows returns false
// Line 124: start_row >= row_count_ returns false
TEST_F(ColumnarTableTests, ReadBatch_StartRowBeyondTable) {
const std::string name = "col_test_offset";
cleanup_table(name);

Schema schema;
schema.add_column("id", common::ValueType::TYPE_INT64);

ColumnarTable table(name, *sm_, schema);
ASSERT_TRUE(table.create());

// Insert 5 rows
auto batch = VectorBatch::create(schema);
for (int i = 0; i < 5; i++) {
batch->get_column(0).append(common::Value::make_int64(i));
}
batch->set_row_count(5);
ASSERT_TRUE(table.append_batch(*batch));
ASSERT_EQ(table.row_count(), 5U);

// Query with start_row = 100, way beyond table rows
auto out = VectorBatch::create(schema);
ASSERT_FALSE(table.read_batch(100, 10, *out)); // start_row >= row_count_
}

} // namespace
209 changes: 209 additions & 0 deletions tests/distributed_executor_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,49 @@ TEST_F(DistributedExecutorWithNodesTests, InsertShardRouting) {
EXPECT_TRUE(res.success());
}

// Test: ShuffleFragment returns success=false
// Line 268: when ShuffleFragment RPC returns reply.success=false,
// sets error "Shuffle failed on node: " + reply.error_msg
TEST_F(DistributedExecutorWithNodesTests, ShuffleFragmentFailure_ReturnsError) {
auto srv1 = std::make_unique<network::RpcServer>(6510);
auto srv2 = std::make_unique<network::RpcServer>(6511);
srv1->start();
srv2->start();
servers_.push_back(std::move(srv1));
servers_.push_back(std::move(srv2));

cm_->register_node("node_1", "127.0.0.1", 6510, config::RunMode::Data);
cm_->register_node("node_2", "127.0.0.1", 6511, config::RunMode::Data);

// Handler for ShuffleFragment that returns failure
auto failure_h = [](const network::RpcHeader&, const std::vector<uint8_t>&, int fd) {
network::QueryResultsReply reply;
reply.success = false;
reply.error_msg = "shard rejected shuffle";
network::RpcHeader resp_h;
resp_h.type = network::RpcType::QueryResults;
resp_h.payload_len = static_cast<uint16_t>(reply.serialize().size());
char h_buf[network::RpcHeader::HEADER_SIZE];
resp_h.encode(h_buf);
send(fd, h_buf, network::RpcHeader::HEADER_SIZE, 0);
auto data = reply.serialize();
if (!data.empty()) send(fd, data.data(), data.size(), 0);
};
// Register on ALL servers so shard routing always hits a server with the handler
for (auto& srv : servers_) {
srv->set_handler(network::RpcType::ShuffleFragment, failure_h);
}

auto lexer = std::make_unique<Lexer>("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
Parser parser(std::move(lexer));
auto stmt = parser.parse_statement();
ASSERT_NE(stmt, nullptr);

auto res = exec_->execute(*stmt, "SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
EXPECT_FALSE(res.success());
EXPECT_TRUE(res.error().find("shard rejected shuffle") != std::string::npos);
}

// Test: INSERT with connect failure
// Verifies error handling when node has no active server
TEST_F(DistributedExecutorWithNodesTests, InsertConnectFailure) {
Expand Down Expand Up @@ -1146,6 +1189,22 @@ TEST_F(DistributedExecutorTests, Join_NaturalNotSupported_ReturnsError) {
(void)res;
}

TEST_F(DistributedExecutorWithNodesTests, Join_NonEqualityCondition_ReturnsError) {
// Register a node (no server needed - we want to test the join validation before RPC)
cm_->register_node("node_1", "127.0.0.1", 6499, config::RunMode::Data);

// JOIN with non-equality condition (e.g., t1.id > t2.id) should return error
// because Shuffle Join requires equality join condition
auto lexer = std::make_unique<Lexer>("SELECT * FROM t1 JOIN t2 ON t1.id > t2.id");
Parser parser(std::move(lexer));
auto stmt = parser.parse_statement();
ASSERT_NE(stmt, nullptr);

auto res = exec_->execute(*stmt, "SELECT * FROM t1 JOIN t2 ON t1.id > t2.id");
ASSERT_FALSE(res.success()) << "Non-equality JOIN should return error";
EXPECT_TRUE(res.error().find("equality") != std::string::npos);
}

// ============= broadcast_table Coverage =============

TEST_F(DistributedExecutorWithNodesTests, BroadcastTable_Basic) {
Expand Down Expand Up @@ -1326,4 +1385,154 @@ TEST_F(DistributedExecutorWithNodesTests, BroadcastTable_MultipleNodes_PushesToA
EXPECT_EQ(pushdata_count.load(), 2);
}

// Test: INNER JOIN executes shuffle join path
// Verifies ShuffleFragment RPC is called for INNER JOIN
TEST_F(DistributedExecutorWithNodesTests, InnerJoinShuffle_ExecutesShufflePath) {
auto srv1 = std::make_unique<network::RpcServer>(6450);
auto srv2 = std::make_unique<network::RpcServer>(6451);
srv1->start();
srv2->start();
servers_.push_back(std::move(srv1));
servers_.push_back(std::move(srv2));

cm_->register_node("node_1", "127.0.0.1", 6450, config::RunMode::Data);
cm_->register_node("node_2", "127.0.0.1", 6451, config::RunMode::Data);

std::atomic<int> shuffle_call_count{0};
std::atomic<int> bloom_filter_push_count{0};

auto success_h = [this](const network::RpcHeader&, const std::vector<uint8_t>&, int fd) {
send_success_reply(fd);
};

// Count ShuffleFragment calls to verify join path is being executed
auto counting_success_h = [&shuffle_call_count, this](const network::RpcHeader&,
const std::vector<uint8_t>&, int fd) {
++shuffle_call_count;
send_success_reply(fd);
};

// Count BloomFilterPush calls to verify bloom filter path is exercised
auto bloom_filter_counting_h = [&bloom_filter_push_count, this](const network::RpcHeader&,
const std::vector<uint8_t>&,
int fd) {
++bloom_filter_push_count;
send_success_reply(fd);
};

// Phase 1 shuffle - COUNTING
servers_[0]->set_handler(network::RpcType::ShuffleFragment, counting_success_h);
servers_[1]->set_handler(network::RpcType::ShuffleFragment, counting_success_h);
// BloomFilterBits aggregation
servers_[0]->set_handler(network::RpcType::BloomFilterBits, success_h);
servers_[1]->set_handler(network::RpcType::BloomFilterBits, success_h);
// BloomFilterPush - COUNTED
servers_[0]->set_handler(network::RpcType::BloomFilterPush, bloom_filter_counting_h);
servers_[1]->set_handler(network::RpcType::BloomFilterPush, bloom_filter_counting_h);
// ExecuteFragment for final results
servers_[0]->set_handler(network::RpcType::ExecuteFragment, success_h);
servers_[1]->set_handler(network::RpcType::ExecuteFragment, success_h);

auto lexer = std::make_unique<Lexer>("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
Parser parser(std::move(lexer));
auto stmt = parser.parse_statement();
ASSERT_NE(stmt, nullptr);

auto res = exec_->execute(*stmt, "SELECT * FROM t1 INNER JOIN t2 ON t1.id = t2.id");
EXPECT_TRUE(res.success());
// ShuffleFragment should be called (proves we're in the shuffle join path)
EXPECT_GE(shuffle_call_count.load(), 1);
// BloomFilterPush should also be called for INNER JOIN
EXPECT_GE(bloom_filter_push_count.load(), 1);
}

// Test: RIGHT JOIN skips bloom filter optimization
// Verifies BloomFilterPush RPC is NOT called for RIGHT JOIN (to avoid false negatives)
TEST_F(DistributedExecutorWithNodesTests, RightJoinShuffle_SkipsBloomFilter) {
auto srv1 = std::make_unique<network::RpcServer>(6452);
auto srv2 = std::make_unique<network::RpcServer>(6453);
srv1->start();
srv2->start();
servers_.push_back(std::move(srv1));
servers_.push_back(std::move(srv2));

cm_->register_node("node_1", "127.0.0.1", 6452, config::RunMode::Data);
cm_->register_node("node_2", "127.0.0.1", 6453, config::RunMode::Data);

std::atomic<int> bloom_filter_push_count{0};

auto success_h = [this](const network::RpcHeader&, const std::vector<uint8_t>&, int fd) {
send_success_reply(fd);
};

auto bloom_filter_counting_h = [&bloom_filter_push_count, this](const network::RpcHeader&,
const std::vector<uint8_t>&,
int fd) {
++bloom_filter_push_count;
send_success_reply(fd);
};

// Phase 1 shuffle
servers_[0]->set_handler(network::RpcType::ShuffleFragment, success_h);
servers_[1]->set_handler(network::RpcType::ShuffleFragment, success_h);
// BloomFilterBits aggregation
servers_[0]->set_handler(network::RpcType::BloomFilterBits, success_h);
servers_[1]->set_handler(network::RpcType::BloomFilterBits, success_h);
// BloomFilterPush - COUNTED
servers_[0]->set_handler(network::RpcType::BloomFilterPush, bloom_filter_counting_h);
servers_[1]->set_handler(network::RpcType::BloomFilterPush, bloom_filter_counting_h);
// ExecuteFragment for final results
servers_[0]->set_handler(network::RpcType::ExecuteFragment, success_h);
servers_[1]->set_handler(network::RpcType::ExecuteFragment, success_h);

auto lexer = std::make_unique<Lexer>("SELECT * FROM t1 RIGHT JOIN t2 ON t1.id = t2.id");
Parser parser(std::move(lexer));
auto stmt = parser.parse_statement();
ASSERT_NE(stmt, nullptr);

auto res = exec_->execute(*stmt, "SELECT * FROM t1 RIGHT JOIN t2 ON t1.id = t2.id");
EXPECT_TRUE(res.success());
// BloomFilterPush should NOT be called for RIGHT JOIN (bloom filter skipped)
EXPECT_EQ(bloom_filter_push_count.load(), 0);
}

// Test: SELECT query returns error from data node
// Verifies error propagation when ExecuteFragment returns success=false
// Path: line 611 all_success=false → line 953 res.set_error(errors)
TEST_F(DistributedExecutorWithNodesTests, SelectErrorFromNode_ReturnsError) {
auto srv1 = std::make_unique<network::RpcServer>(6454);
srv1->start();
servers_.push_back(std::move(srv1));

cm_->register_node("node_1", "127.0.0.1", 6454, config::RunMode::Data);

// Handler returns success=false with error message
servers_[0]->set_handler(
network::RpcType::ExecuteFragment,
[](const network::RpcHeader&, const std::vector<uint8_t>& payload, int fd) {
[[maybe_unused]] auto args = network::ExecuteFragmentArgs::deserialize(payload);
network::QueryResultsReply reply;
reply.success = false;
reply.error_msg = "node rejected query";
reply.schema.add_column("id", common::ValueType::TYPE_INT32);
network::RpcHeader resp_h;
resp_h.type = network::RpcType::QueryResults;
resp_h.payload_len = static_cast<uint16_t>(reply.serialize().size());
char h_buf[network::RpcHeader::HEADER_SIZE];
resp_h.encode(h_buf);
send(fd, h_buf, network::RpcHeader::HEADER_SIZE, 0);
auto data = reply.serialize();
if (!data.empty()) send(fd, data.data(), data.size(), 0);
});

auto lexer = std::make_unique<Lexer>("SELECT * FROM test_table");
Parser parser(std::move(lexer));
auto stmt = parser.parse_statement();
ASSERT_NE(stmt, nullptr);

auto res = exec_->execute(*stmt, "SELECT * FROM test_table");
EXPECT_FALSE(res.success());
EXPECT_TRUE(res.error().find("node rejected query") != std::string::npos);
}

} // namespace
11 changes: 11 additions & 0 deletions tests/lock_manager_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,15 @@ TEST(LockManagerTests, LockUpgrade) {
static_cast<void>(lm.unlock(&txn, rid));
}

// Test: unlock on RID that was never locked
// Line 117: returns false when RID not found in lock_table_
TEST(LockManagerTests, UnlockNeverLocked_ReturnsFalse) {
LockManager lm;
Transaction txn(1);
HeapTable::TupleId rid(999, 999); // Never acquired

// Unlock without ever acquiring should return false
EXPECT_FALSE(lm.unlock(&txn, rid));
}

} // namespace
10 changes: 10 additions & 0 deletions tests/parser_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ TEST(ParserTests, GarbageInput) {
EXPECT_EQ(stmt, nullptr);
}

// Test: SELECT without FROM clause
// Line 171-175: parser returns nullptr when FROM is missing
TEST(ParserTests, SelectWithoutFrom) {
auto stmt = parse("SELECT 1");
EXPECT_EQ(stmt, nullptr);

auto stmt2 = parse("SELECT col1");
EXPECT_EQ(stmt2, nullptr);
}

// ============= SELECT Statement Tests =============

TEST(ParserTests, SelectSimple) {
Expand Down
Loading