From 81f1263c275fe0372d79644a4fb3ce5f10e8d317 Mon Sep 17 00:00:00 2001 From: Anastasiia Pnevskaia Date: Tue, 4 Mar 2025 14:01:16 +0100 Subject: [PATCH 1/7] Prefix caching for sequences with embeddings. --- src/cpp/src/block_manager.hpp | 15 ++-- src/cpp/src/continuous_batching_impl.cpp | 7 +- src/cpp/src/model_runner.hpp | 87 +++++++++++++------ src/cpp/src/sequence_group.cpp | 74 ++++++++++++++-- src/cpp/src/sequence_group.hpp | 47 ++++++++-- tests/cpp/scheduler.cpp | 104 +++++++++++++++++++++++ 6 files changed, 278 insertions(+), 56 deletions(-) diff --git a/src/cpp/src/block_manager.hpp b/src/cpp/src/block_manager.hpp index 06f4f52052..af8aeae072 100644 --- a/src/cpp/src/block_manager.hpp +++ b/src/cpp/src/block_manager.hpp @@ -1073,7 +1073,7 @@ class BlockManager { // When add_request() is executed in multiple threads accessing to cached_blocks causes segfault. // The mutex is needed to prevent such segfaults. const std::lock_guard lock(m_cached_blocks_map_mutex); - auto prompt_ids = group->get_prompt_ids(); + auto prompt_len = group->get_prompt_len(); auto sequences = group->get_not_finished_sequences(); OPENVINO_ASSERT(sequences.size() == 1); auto sequence = sequences[0]; @@ -1085,11 +1085,11 @@ class BlockManager { auto& block_table = m_block_table[seq_id]; size_t content_len = 0; - while (content_len < prompt_ids.size()) { + while (content_len < prompt_len) { size_t prev_iteration_content_len = content_len; content_len += m_block_size; - if (content_len > prompt_ids.size()) { - content_len = prompt_ids.size(); + if (content_len > prompt_len) { + content_len = prompt_len; } // restore fully filled blocks auto full_block_hash = sequence->get_hash(content_len); @@ -1101,11 +1101,11 @@ class BlockManager { block->set_timestamp(timestamp); block_table[layer_idx].push_back(block); } - group->update_processed_tokens_num(content_len == prompt_ids.size() ? content_len - 1 : content_len); + group->update_processed_tokens_num(content_len == prompt_len ? content_len - 1 : content_len); } else { // restore partially filled block for (size_t i = 1; i < m_block_size; i++) { - if (prev_iteration_content_len + i > prompt_ids.size()) { + if (prev_iteration_content_len + i > prompt_len) { break; } auto hash = sequence->get_hash(prev_iteration_content_len + i); @@ -1118,8 +1118,7 @@ class BlockManager { block->set_timestamp(timestamp); block_table[layer_idx].push_back(block); } - - group->update_processed_tokens_num(prev_iteration_content_len + i == prompt_ids.size() ? prev_iteration_content_len + i - 1 : prev_iteration_content_len + i); + group->update_processed_tokens_num(prev_iteration_content_len + i == prompt_len ? prev_iteration_content_len + i - 1 : prev_iteration_content_len + i); break; } diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index f93433d364..a421e27583 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -269,9 +269,6 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request SequenceGroup::Ptr sequence_group = std::make_shared(request_id, input_ids, sampling_params, m_block_size); if (m_scheduler->get_config().enable_prefix_caching) { - if (m_model_input_type == ModelInputType::EMBEDDINGS) { - OPENVINO_THROW("Prefix caching is not supported for VLM models."); - } m_scheduler->restore_cached_blocks(sequence_group); } @@ -405,6 +402,10 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { free_fork_timer.end(); } + + // append embeddings for generated tokens + if (m_model_input_type == ModelInputType::EMBEDDINGS) + m_model_runner->append_embeddings(m_requests, scheduler_output); // notify requests dropped by handle { diff --git a/src/cpp/src/model_runner.hpp b/src/cpp/src/model_runner.hpp index ae31446096..c9f8e186d1 100644 --- a/src/cpp/src/model_runner.hpp +++ b/src/cpp/src/model_runner.hpp @@ -119,7 +119,6 @@ class ModelRunner { size_t total_num_tokens = 0, total_num_blocks = 0; size_t max_context_len_val = 0; size_t hidden_size = 0; - size_t num_generated_ids = 0; OPENVINO_ASSERT(sequence_groups.size() > 0); auto sequence_group_type = sequence_groups[0]->get_sequence_group_type(); if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { @@ -135,9 +134,6 @@ class ModelRunner { total_num_tokens += sequence_group->get_num_scheduled_tokens() * num_sequences; total_num_blocks += sequence_group->get_num_blocks() * num_sequences; max_context_len_val = std::max(max_context_len_val, sequence_group->get_context_len()); - for (auto seq: sequence_group->get_running_sequences()) { - num_generated_ids += seq->get_generated_len(); - } } ov::Tensor @@ -163,27 +159,6 @@ class ModelRunner { if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { OPENVINO_ASSERT(m_embedding.get_request(), "Got sequence group with embeddings, but embeddings model wasn't set."); inputs_embeds_data = inputs_embeds.data(); - - ov::Tensor generated_ids = ov::Tensor(ov::element::i64, {1, num_generated_ids}); - int64_t *generated_ids_data = generated_ids.data(); - size_t pos = 0; - for (size_t i = 0; i < num_sequence_groups; ++i) { - size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i]; - SequenceGroup::CPtr sequence_group = sequence_groups[seq_group_id]; - for (auto seq: sequence_group->get_running_sequences()) { - auto generated_ids = seq->get_generated_ids(); - for (size_t token_idx = 0; token_idx < generated_ids.size(); token_idx++) { - generated_ids_data[pos] = generated_ids[token_idx]; - pos++; - } - } - } - if (pos > 0) { - // TODO: Compute embeddings only for last generated token, while previously generated embeddings save in SequenceGroup - generated_ids_embeds = m_embedding.infer(generated_ids); - generated_ids_embeds_data = generated_ids_embeds.data(); - } - } else if (sequence_group_type == SequenceGroupType::TOKENS) { input_ids_data = input_ids.data(); } @@ -234,8 +209,8 @@ class ModelRunner { sequence_group->get_prompt_ids()[position_id] : sequence->get_generated_ids()[position_id - prompt_len]; } else if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { - auto embeds_pos = position_id < prompt_len ? 0 : hidden_size * (position_id - prompt_len); - const float* src = position_id < prompt_len ? sequence_group->get_input_embeds()[position_id].data() : generated_ids_embeds_data + embeds_pos; + auto generated_embeds = sequence->get_generated_ids_embeds(); + const float* src = position_id < prompt_len ? sequence_group->get_input_embeds()[position_id].data() : generated_embeds[position_id - prompt_len].data(); std::copy_n(src, hidden_size, inputs_embeds_data + token_id * hidden_size); } else { OPENVINO_THROW("Unknown model inputs type."); @@ -271,7 +246,6 @@ class ModelRunner { input_ids_data += num_scheduled_tokens; } else if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { inputs_embeds_data += num_scheduled_tokens * hidden_size; - generated_ids_embeds_data += sequence->get_generated_len() * hidden_size; } position_ids_data += num_scheduled_tokens; @@ -337,6 +311,63 @@ class ModelRunner { return m_request.get_tensor("logits"); } + void append_embeddings(const std::vector & sequence_groups, const Scheduler::Output& scheduler_output) { + size_t num_sequence_groups = scheduler_output.m_scheduled_sequence_groups_ids.size(); + size_t num_generated_ids_without_embeddings = 0; + OPENVINO_ASSERT(sequence_groups.size() > 0); + + // compute aggregated values + for (size_t i = 0; i < num_sequence_groups; ++i) { + size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i]; + SequenceGroup::CPtr sequence_group = sequence_groups[seq_group_id]; + size_t num_sequences = sequence_group->num_running_seqs(); + OPENVINO_ASSERT(sequence_group->get_sequence_group_type() == SequenceGroupType::EMBEDDINGS); + for (auto seq: sequence_group->get_running_sequences()) { + num_generated_ids_without_embeddings += seq->get_generated_len() - seq->get_generated_ids_embeds().size(); + } + } + size_t hidden_size = sequence_groups[0]->get_hidden_size(); + + ov::Tensor generated_ids_embeds; + float *generated_ids_embeds_data = nullptr; + + OPENVINO_ASSERT(m_embedding.get_request(), "Got sequence group with embeddings, but embeddings model wasn't set."); + + ov::Tensor generated_ids = ov::Tensor(ov::element::i64, {1, num_generated_ids_without_embeddings}); + int64_t *generated_ids_data = generated_ids.data(); + size_t pos = 0; + for (size_t i = 0; i < num_sequence_groups; ++i) { + size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i]; + SequenceGroup::CPtr sequence_group = sequence_groups[seq_group_id]; + for (auto seq: sequence_group->get_running_sequences()) { + auto generated_ids = seq->get_generated_ids(); + for (size_t token_idx = seq->get_generated_ids_embeds().size(); token_idx < generated_ids.size(); token_idx++) { + generated_ids_data[pos] = generated_ids[token_idx]; + pos++; + } + } + } + if (pos > 0) { + generated_ids_embeds = m_embedding.infer(generated_ids); + generated_ids_embeds_data = generated_ids_embeds.data(); + + for (size_t i = 0; i < num_sequence_groups; ++i) { + size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i]; + size_t embeds_pos = 0; + SequenceGroup::Ptr sequence_group = sequence_groups[seq_group_id]; + for (auto seq: sequence_group->get_running_sequences()) { + auto generated_ids = seq->get_generated_ids(); + size_t new_embeds_count = seq->get_generated_len() - seq->get_generated_ids_embeds().size(); + ov::Coordinate start{0, embeds_pos, 0}; + ov::Coordinate end{1, embeds_pos + new_embeds_count, hidden_size}; + ov::Tensor embedding(generated_ids_embeds, start, end); + seq->append_generated_ids_embeds(embedding); + embeds_pos += new_embeds_count; + } + } + } + } + private: void _fill_indices_from_block_tables( const std::vector& dst_tensor_names, diff --git a/src/cpp/src/sequence_group.cpp b/src/cpp/src/sequence_group.cpp index 7b9265db1a..d90a742b71 100644 --- a/src/cpp/src/sequence_group.cpp +++ b/src/cpp/src/sequence_group.cpp @@ -22,22 +22,78 @@ size_t Sequence::_make_hash(size_t content_length) { size_t prefix_hashes_needed_count = block_start_idx / block_size; OPENVINO_ASSERT(prefix_hashes_needed_count <= m_prefix_hashes.size()); content.insert(content.end(), m_prefix_hashes.begin(), m_prefix_hashes.begin() + prefix_hashes_needed_count); + char* data; + std::size_t size; // get tokens corresponding to current block - const auto prompt_ids = sequence_group->get_prompt_ids(); - OPENVINO_ASSERT(content_length <= prompt_ids.size() + m_generated_ids.size()); - if (block_start_idx < prompt_ids.size()) { - content.insert(content.end(), prompt_ids.begin() + block_start_idx, prompt_ids.begin() + std::min(prompt_ids.size(), content_length)); + if (sequence_group->get_sequence_group_type() == SequenceGroupType::TOKENS) { + const auto prompt_ids = sequence_group->get_prompt_ids(); + OPENVINO_ASSERT(content_length <= prompt_ids.size() + m_generated_ids.size()); + if (block_start_idx < prompt_ids.size()) { + content.insert(content.end(), prompt_ids.begin() + block_start_idx, prompt_ids.begin() + std::min(prompt_ids.size(), content_length)); + } + if (content_length > prompt_ids.size()) { + size_t start = block_start_idx < prompt_ids.size() ? 0 : block_start_idx - prompt_ids.size(); + content.insert(content.end(), m_generated_ids.begin() + start, m_generated_ids.begin() + content_length - prompt_ids.size()); + } + data = reinterpret_cast(content.data()); + size = content.size() * sizeof(content[0]); } - if (content_length > prompt_ids.size()) { - size_t start = block_start_idx < prompt_ids.size() ? 0 : block_start_idx - prompt_ids.size(); - content.insert(content.end(), m_generated_ids.begin() + start, m_generated_ids.begin() + content_length - prompt_ids.size()); + else if (sequence_group->get_sequence_group_type() == SequenceGroupType::EMBEDDINGS) { + const auto input_embeds = sequence_group->get_input_embeds(); + const auto generated_embeds = m_generated_ids_embeds; + OPENVINO_ASSERT(content_length <= input_embeds.size() + generated_embeds.size()); + std::vector content_float; + + // get inputs embeddings + if (block_start_idx < input_embeds.size()) { + for (size_t idx = block_start_idx; idx < std::min(input_embeds.size(), content_length); idx++) { + auto embed = _reduce_embedding(input_embeds[idx]); + const char* embed_char = reinterpret_cast(embed.data()); + content_float.insert(content_float.end(), embed.begin(), embed.end()); + } + } + + // get generated ids embeddings + if (content_length > input_embeds.size()) { + size_t start = block_start_idx < input_embeds.size() ? 0 : block_start_idx - input_embeds.size(); + for (size_t idx = start; idx < content_length - input_embeds.size(); idx++) { + auto embed = _reduce_embedding(generated_embeds[idx]); + content_float.insert(content_float.end(), embed.begin(), embed.end()); + } + } + + size_t prev_hashes_size = content.size() == 0 ? 0 : content.size() * sizeof(content[0]); + size_t content_float_size = content_float.size() * sizeof(content_float[0]); + size = prev_hashes_size + content_float_size; + data = new char[size]; + + // append previously calculated prefix hashes if they are available + if (prev_hashes_size) { + auto prev_hashes = reinterpret_cast(content.data()); + std::copy_n(prev_hashes, prev_hashes_size, data); + } + + auto content_char = reinterpret_cast(content_float.data()); + std::copy_n(content_char, content_float_size, data + prev_hashes_size); } - const char* data = reinterpret_cast(content.data()); - std::size_t size = content.size() * sizeof(content[0]); + else { + OPENVINO_THROW("Hash calculation is not supported for this sequence type."); + } + auto hash = std::hash{}(std::string_view(data, size)); return std::hash{}(std::string_view(data, size)); } +std::vector Sequence::_reduce_embedding(const std::vector& embedding) { + size_t s = embedding.size(); + size_t res_size = std::min((size_t)ceil(float(embedding.size()) / m_embeddings_hash_calculation_stride), m_embeddings_hash_max_num_values); + std::vector res(res_size); + for (size_t i = 0, idx=0; idx < res_size; i+= m_embeddings_hash_calculation_stride, idx++) { + res[idx] = embedding[i]; + } + return res; +} + // Each KV block can be uniquely identified by // the tokens within the block and the tokens in the prefix before the block. // hash(prefix tokens + block tokens) <--> KV Block diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index 7da341f43f..af519a67e7 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -49,17 +49,28 @@ class Sequence { std::vector m_prefix_hashes; SequenceGroup* m_sequence_group = nullptr; static std::mutex m_counter_mutex; + std::vector> m_generated_ids_embeds; + SequenceGroupType m_type; + size_t m_hidden_size; + + // Embeddings hash calculation params + const size_t m_embeddings_hash_max_num_values = 10; // max number of values used for embeddings hash calculation + const size_t m_embeddings_hash_calculation_stride = 50; // the stride with which values are taken from embeddings vector size_t _make_hash(size_t content_length); - explicit Sequence(const uint64_t id) : m_grouped_id(id) {} + std::vector _reduce_embedding(const std::vector& embedding); + + explicit Sequence(const uint64_t id, const SequenceGroupType type, const size_t hidden_size) : m_grouped_id(id), m_type(type), m_hidden_size(hidden_size) {} Sequence(const Sequence& seq, const uint64_t id) : m_generated_ids(seq.m_generated_ids), m_grouped_id(id), m_status(seq.m_status), m_cumulative_log_prob(seq.m_cumulative_log_prob), - m_sequence_group(seq.m_sequence_group) { + m_sequence_group(seq.m_sequence_group), + m_type(seq.m_type), + m_hidden_size(seq.m_hidden_size) { OPENVINO_ASSERT(seq.m_id != m_id); } @@ -67,8 +78,8 @@ class Sequence { using Ptr = std::shared_ptr; using CPtr = std::shared_ptr; - static Sequence::Ptr create(const uint64_t id) { - return Sequence::Ptr(new Sequence(id)); + static Sequence::Ptr create(const uint64_t id, const SequenceGroupType type = SequenceGroupType::TOKENS, const size_t hidden_size = 0) { + return Sequence::Ptr(new Sequence(id, type, hidden_size)); } static Sequence::Ptr fork(Sequence::CPtr sequence, const uint64_t id) { @@ -191,6 +202,25 @@ class Sequence { m_sequence_group = sequence_group; } + std::vector> get_generated_ids_embeds() const{ + OPENVINO_ASSERT(m_type == ov::genai::SequenceGroupType::EMBEDDINGS); + return m_generated_ids_embeds; + } + + void append_generated_ids_embeds(ov::Tensor generated_ids_embeds) { + OPENVINO_ASSERT(m_type == SequenceGroupType::EMBEDDINGS); + auto embeds_count = generated_ids_embeds.get_shape()[1]; + OPENVINO_ASSERT(m_hidden_size == generated_ids_embeds.get_shape()[2]); + + auto current_embeds_size = m_generated_ids_embeds.size(); + for (size_t i = current_embeds_size, idx = 0; i < current_embeds_size + embeds_count; i++, idx++) { + m_generated_ids_embeds.emplace_back(std::vector()); + m_generated_ids_embeds[i].resize(m_hidden_size); + std::copy_n(generated_ids_embeds.data() + idx * m_hidden_size, m_hidden_size, m_generated_ids_embeds[i].begin()); + + } + } + std::shared_ptr get_sequence_group_ptr() const; // Each KV block can be uniquely identified by @@ -261,6 +291,7 @@ class SequenceGroup : public std::enable_shared_from_this { : SequenceGroup(request_id, sampling_params, block_size) { size_t prompt_len; + size_t hidden_size = 0; if (input_ids.get_shape().size() > 1) { prompt_len = input_ids.get_shape()[1]; } else { @@ -273,11 +304,11 @@ class SequenceGroup : public std::enable_shared_from_this { std::copy_n(input_ids.data(), prompt_len, m_prompt_ids.begin()); m_sequence_group_type = SequenceGroupType::TOKENS; } else if (input_ids.get_element_type() == ov::element::f32) { - auto embeds_len = input_ids.get_shape()[2]; + hidden_size = input_ids.get_shape()[2]; m_input_embeds.resize(prompt_len); for (size_t i = 0; i < prompt_len; i++) { - m_input_embeds[i].resize(embeds_len); - std::copy_n(input_ids.data() + i * embeds_len, embeds_len, m_input_embeds[i].begin()); + m_input_embeds[i].resize(hidden_size); + std::copy_n(input_ids.data() + i * hidden_size, hidden_size, m_input_embeds[i].begin()); } m_sequence_group_type = SequenceGroupType::EMBEDDINGS; } @@ -287,7 +318,7 @@ class SequenceGroup : public std::enable_shared_from_this { m_prompt_log_probs.reserve(prompt_len); // create a single sequence - add_sequence(Sequence::create(m_next_sequence_id++)); + add_sequence(Sequence::create(m_next_sequence_id++, m_sequence_group_type, hidden_size)); } void add_sequence(const Sequence::Ptr & sequence) { diff --git a/tests/cpp/scheduler.cpp b/tests/cpp/scheduler.cpp index ee4d8d518a..40ea18381e 100644 --- a/tests/cpp/scheduler.cpp +++ b/tests/cpp/scheduler.cpp @@ -29,6 +29,19 @@ std::shared_ptr init_cache_manager(SchedulerConfig scheduler_confi return std::make_shared(request, kv_head_configs); } +ov::Tensor embeds_matrix_to_tensor(std::vector> vec) { + size_t hidden_size = vec[0].size(); + ov::Tensor res = ov::Tensor(ov::element::f32, {1, vec.size(), hidden_size}); + auto res_data = res.data(); + size_t pos = 0; + for (size_t i = 0; i < vec.size(); i ++) { + for (size_t j = 0; j < hidden_size; j++) { + res_data[pos++] = vec[i][j]; + } + } + return res; +} + TEST(TestScheduler, general_test) { std::array configs = {SchedulerConfig(), SchedulerConfig()}; configs.at(0).max_num_batched_tokens = 32; @@ -1011,3 +1024,94 @@ TEST(TestScheduler, FullyPreemptsCacheEvictedSequences) { } } } + +TEST(TestScheduler, prefix_caching_embeddings_test) { + std::array configs = {SchedulerConfig(), SchedulerConfig()}; + configs.at(0).max_num_batched_tokens = 32; + configs.at(0).num_kv_blocks = 100; + configs.at(0).dynamic_split_fuse = false; + configs.at(0).max_num_seqs = 5; + configs.at(0).enable_prefix_caching = true; + configs.at(1).max_num_batched_tokens = 32; + configs.at(1).num_kv_blocks = 100; + configs.at(1).dynamic_split_fuse = true; + configs.at(1).max_num_seqs = 5; + configs.at(1).enable_prefix_caching = true; + for (auto scheduler_config: configs) { + size_t hidden_size = 300; + std::vector> prompt_embeddings; + for (size_t i = 0; i < 8; i++) { + prompt_embeddings.emplace_back(std::vector()); + for (size_t j = 0; j < hidden_size; j++) { + prompt_embeddings[i].push_back(i * hidden_size + j + (float)j * 0.05); + } + } + std::vector> histrory_embeddings = {}; + // schedule prompt + Scheduler scheduler = Scheduler(4, init_cache_manager(scheduler_config), scheduler_config); + + size_t chat_iterations = 10; + + for (size_t chat_iteration = 0; chat_iteration < chat_iterations; chat_iteration++) { + std::vector> embeddings = histrory_embeddings; + embeddings.insert(embeddings.end(), prompt_embeddings.begin(), prompt_embeddings.end()); + SequenceGroup::Ptr sequence_group = std::make_shared(0, embeds_matrix_to_tensor(embeddings), ov::genai::greedy(), 4); + scheduler.restore_cached_blocks(sequence_group); + std::vector requests = {sequence_group}; + + auto out1 = scheduler.schedule(requests); + if (chat_iteration == 0) + EXPECT_EQ(out1.m_total_num_scheduled_tokens, prompt_embeddings.size()); + else + { + EXPECT_EQ(out1.m_total_num_scheduled_tokens, prompt_embeddings.size() + 1); + } + for (auto seq: requests) { + std::vector running_sequences = seq->get_running_sequences(); + running_sequences[0]->append_token(chat_iteration, 0.7); + + std::vector embed(hidden_size); + for (size_t i = 0; i < hidden_size; i++) { + embed[i] = chat_iteration + i * hidden_size + (float)i * 0.05; + } + running_sequences[0]->append_generated_ids_embeds(embeds_matrix_to_tensor({embed})); + seq->finish_iteration(); + } + + // schedule generate + size_t num_generate_tokens = 10; + for (size_t i = 0; i < num_generate_tokens; i++) { + auto out2 = scheduler.schedule(requests); + EXPECT_EQ(out2.m_total_num_scheduled_tokens, 1); + for (auto seq: requests) { + std::vector running_sequences = seq->get_running_sequences(); + running_sequences[0]->append_token(16 + chat_iteration, 0.9); + std::vector embed(hidden_size); + for (size_t i = 0; i < hidden_size; i++) { + embed[i] = chat_iteration + i * hidden_size + (float)i * 0.05; + } + running_sequences[0]->append_generated_ids_embeds(embeds_matrix_to_tensor({embed})); + seq->finish_iteration(); + } + } + + // finish sequence + auto sequence = requests[0]->get_running_sequences()[0]; + sequence->set_status(SequenceStatus::FINISHED); + auto idx0 = sequence->get_id(); + scheduler.free_sequence(idx0); + auto generated_embeddings = sequence->get_generated_ids_embeds(); + + histrory_embeddings.insert(histrory_embeddings.end(), prompt_embeddings.begin(), prompt_embeddings.end()); + histrory_embeddings.insert(histrory_embeddings.end(), generated_embeddings.begin(), generated_embeddings.end()); + + for (auto& seq : sequence_group->get_sequences()) { + if (seq->get_id() == idx0) { + continue; + } + scheduler.free_sequence(seq->get_id()); + } + } + } + +} \ No newline at end of file From e6ec4942d1097a5b5dfc91886f031a6989ab5b06 Mon Sep 17 00:00:00 2001 From: Anastasiia Pnevskaia Date: Tue, 4 Mar 2025 15:12:02 +0100 Subject: [PATCH 2/7] Apply suggestions from code review Co-authored-by: Ilya Lavrenov --- src/cpp/src/model_runner.hpp | 4 ++-- src/cpp/src/sequence_group.cpp | 3 +-- src/cpp/src/sequence_group.hpp | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/cpp/src/model_runner.hpp b/src/cpp/src/model_runner.hpp index c9f8e186d1..de685b09d5 100644 --- a/src/cpp/src/model_runner.hpp +++ b/src/cpp/src/model_runner.hpp @@ -209,7 +209,7 @@ class ModelRunner { sequence_group->get_prompt_ids()[position_id] : sequence->get_generated_ids()[position_id - prompt_len]; } else if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { - auto generated_embeds = sequence->get_generated_ids_embeds(); + const auto& generated_embeds = sequence->get_generated_ids_embeds(); const float* src = position_id < prompt_len ? sequence_group->get_input_embeds()[position_id].data() : generated_embeds[position_id - prompt_len].data(); std::copy_n(src, hidden_size, inputs_embeds_data + token_id * hidden_size); } else { @@ -340,7 +340,7 @@ class ModelRunner { size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i]; SequenceGroup::CPtr sequence_group = sequence_groups[seq_group_id]; for (auto seq: sequence_group->get_running_sequences()) { - auto generated_ids = seq->get_generated_ids(); + const auto& generated_ids = seq->get_generated_ids(); for (size_t token_idx = seq->get_generated_ids_embeds().size(); token_idx < generated_ids.size(); token_idx++) { generated_ids_data[pos] = generated_ids[token_idx]; pos++; diff --git a/src/cpp/src/sequence_group.cpp b/src/cpp/src/sequence_group.cpp index d90a742b71..a21f176d0d 100644 --- a/src/cpp/src/sequence_group.cpp +++ b/src/cpp/src/sequence_group.cpp @@ -40,7 +40,7 @@ size_t Sequence::_make_hash(size_t content_length) { size = content.size() * sizeof(content[0]); } else if (sequence_group->get_sequence_group_type() == SequenceGroupType::EMBEDDINGS) { - const auto input_embeds = sequence_group->get_input_embeds(); + const auto& input_embeds = sequence_group->get_input_embeds(); const auto generated_embeds = m_generated_ids_embeds; OPENVINO_ASSERT(content_length <= input_embeds.size() + generated_embeds.size()); std::vector content_float; @@ -49,7 +49,6 @@ size_t Sequence::_make_hash(size_t content_length) { if (block_start_idx < input_embeds.size()) { for (size_t idx = block_start_idx; idx < std::min(input_embeds.size(), content_length); idx++) { auto embed = _reduce_embedding(input_embeds[idx]); - const char* embed_char = reinterpret_cast(embed.data()); content_float.insert(content_float.end(), embed.begin(), embed.end()); } } diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index af519a67e7..8db00a2bb2 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -202,7 +202,7 @@ class Sequence { m_sequence_group = sequence_group; } - std::vector> get_generated_ids_embeds() const{ + const std::vector>& get_generated_ids_embeds() const { OPENVINO_ASSERT(m_type == ov::genai::SequenceGroupType::EMBEDDINGS); return m_generated_ids_embeds; } From 90b103cddb4eddca163a54fe82bd43653cd9c5ce Mon Sep 17 00:00:00 2001 From: Anastasiia Pnevskaia Date: Tue, 4 Mar 2025 16:30:58 +0100 Subject: [PATCH 3/7] Applied comments. --- src/cpp/src/sequence_group.cpp | 31 +++++++------------------------ src/cpp/src/sequence_group.hpp | 7 ++++--- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/src/cpp/src/sequence_group.cpp b/src/cpp/src/sequence_group.cpp index a21f176d0d..5a97ddc5fc 100644 --- a/src/cpp/src/sequence_group.cpp +++ b/src/cpp/src/sequence_group.cpp @@ -22,8 +22,6 @@ size_t Sequence::_make_hash(size_t content_length) { size_t prefix_hashes_needed_count = block_start_idx / block_size; OPENVINO_ASSERT(prefix_hashes_needed_count <= m_prefix_hashes.size()); content.insert(content.end(), m_prefix_hashes.begin(), m_prefix_hashes.begin() + prefix_hashes_needed_count); - char* data; - std::size_t size; // get tokens corresponding to current block if (sequence_group->get_sequence_group_type() == SequenceGroupType::TOKENS) { @@ -36,20 +34,17 @@ size_t Sequence::_make_hash(size_t content_length) { size_t start = block_start_idx < prompt_ids.size() ? 0 : block_start_idx - prompt_ids.size(); content.insert(content.end(), m_generated_ids.begin() + start, m_generated_ids.begin() + content_length - prompt_ids.size()); } - data = reinterpret_cast(content.data()); - size = content.size() * sizeof(content[0]); } else if (sequence_group->get_sequence_group_type() == SequenceGroupType::EMBEDDINGS) { const auto& input_embeds = sequence_group->get_input_embeds(); const auto generated_embeds = m_generated_ids_embeds; OPENVINO_ASSERT(content_length <= input_embeds.size() + generated_embeds.size()); - std::vector content_float; // get inputs embeddings if (block_start_idx < input_embeds.size()) { for (size_t idx = block_start_idx; idx < std::min(input_embeds.size(), content_length); idx++) { auto embed = _reduce_embedding(input_embeds[idx]); - content_float.insert(content_float.end(), embed.begin(), embed.end()); + content.insert(content.end(), embed.begin(), embed.end()); } } @@ -58,37 +53,25 @@ size_t Sequence::_make_hash(size_t content_length) { size_t start = block_start_idx < input_embeds.size() ? 0 : block_start_idx - input_embeds.size(); for (size_t idx = start; idx < content_length - input_embeds.size(); idx++) { auto embed = _reduce_embedding(generated_embeds[idx]); - content_float.insert(content_float.end(), embed.begin(), embed.end()); + content.insert(content.end(), embed.begin(), embed.end()); } } - - size_t prev_hashes_size = content.size() == 0 ? 0 : content.size() * sizeof(content[0]); - size_t content_float_size = content_float.size() * sizeof(content_float[0]); - size = prev_hashes_size + content_float_size; - data = new char[size]; - - // append previously calculated prefix hashes if they are available - if (prev_hashes_size) { - auto prev_hashes = reinterpret_cast(content.data()); - std::copy_n(prev_hashes, prev_hashes_size, data); - } - - auto content_char = reinterpret_cast(content_float.data()); - std::copy_n(content_char, content_float_size, data + prev_hashes_size); } else { OPENVINO_THROW("Hash calculation is not supported for this sequence type."); } + char* data = reinterpret_cast(content.data()); + std::size_t size = content.size() * sizeof(content[0]); auto hash = std::hash{}(std::string_view(data, size)); return std::hash{}(std::string_view(data, size)); } -std::vector Sequence::_reduce_embedding(const std::vector& embedding) { +std::vector Sequence::_reduce_embedding(const std::vector& embedding) { size_t s = embedding.size(); size_t res_size = std::min((size_t)ceil(float(embedding.size()) / m_embeddings_hash_calculation_stride), m_embeddings_hash_max_num_values); - std::vector res(res_size); + std::vector res(res_size); for (size_t i = 0, idx=0; idx < res_size; i+= m_embeddings_hash_calculation_stride, idx++) { - res[idx] = embedding[i]; + res[idx] = std::round(embedding[i] * m_multiplier); } return res; } diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index 8db00a2bb2..09290d1f53 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -54,12 +54,13 @@ class Sequence { size_t m_hidden_size; // Embeddings hash calculation params - const size_t m_embeddings_hash_max_num_values = 10; // max number of values used for embeddings hash calculation - const size_t m_embeddings_hash_calculation_stride = 50; // the stride with which values are taken from embeddings vector + static constexpr size_t m_embeddings_hash_max_num_values = 10; // max number of values used for embeddings hash calculation + static constexpr size_t m_embeddings_hash_calculation_stride = 50; // the stride with which values are taken from embeddings vector + static constexpr size_t m_multiplier = 10000; // multiplier by which float values are multiplied before conversion to size_t size_t _make_hash(size_t content_length); - std::vector _reduce_embedding(const std::vector& embedding); + static std::vector _reduce_embedding(const std::vector& embedding); explicit Sequence(const uint64_t id, const SequenceGroupType type, const size_t hidden_size) : m_grouped_id(id), m_type(type), m_hidden_size(hidden_size) {} From 1d7dc19d698f060fcff94dbcd3e3b12d21b38387 Mon Sep 17 00:00:00 2001 From: Anastasiia Pnevskaia Date: Tue, 4 Mar 2025 16:36:03 +0100 Subject: [PATCH 4/7] Minor fix. --- src/cpp/src/sequence_group.cpp | 4 ++-- src/cpp/src/sequence_group.hpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cpp/src/sequence_group.cpp b/src/cpp/src/sequence_group.cpp index 5a97ddc5fc..b6436801df 100644 --- a/src/cpp/src/sequence_group.cpp +++ b/src/cpp/src/sequence_group.cpp @@ -66,10 +66,10 @@ size_t Sequence::_make_hash(size_t content_length) { return std::hash{}(std::string_view(data, size)); } -std::vector Sequence::_reduce_embedding(const std::vector& embedding) { +std::vector Sequence::_reduce_embedding(const std::vector& embedding) { size_t s = embedding.size(); size_t res_size = std::min((size_t)ceil(float(embedding.size()) / m_embeddings_hash_calculation_stride), m_embeddings_hash_max_num_values); - std::vector res(res_size); + std::vector res(res_size); for (size_t i = 0, idx=0; idx < res_size; i+= m_embeddings_hash_calculation_stride, idx++) { res[idx] = std::round(embedding[i] * m_multiplier); } diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index 09290d1f53..da4ff503ca 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -60,7 +60,7 @@ class Sequence { size_t _make_hash(size_t content_length); - static std::vector _reduce_embedding(const std::vector& embedding); + static std::vector _reduce_embedding(const std::vector& embedding); explicit Sequence(const uint64_t id, const SequenceGroupType type, const size_t hidden_size) : m_grouped_id(id), m_type(type), m_hidden_size(hidden_size) {} From 9b346fff308ea450296f5ecec5df26d1cf74d0bc Mon Sep 17 00:00:00 2001 From: Anastasiia Pnevskaia Date: Tue, 4 Mar 2025 16:43:24 +0100 Subject: [PATCH 5/7] Minor correction. --- src/cpp/src/sequence_group.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/cpp/src/sequence_group.cpp b/src/cpp/src/sequence_group.cpp index b6436801df..3dc7ed5686 100644 --- a/src/cpp/src/sequence_group.cpp +++ b/src/cpp/src/sequence_group.cpp @@ -60,9 +60,8 @@ size_t Sequence::_make_hash(size_t content_length) { else { OPENVINO_THROW("Hash calculation is not supported for this sequence type."); } - char* data = reinterpret_cast(content.data()); + const char* data = reinterpret_cast(content.data()); std::size_t size = content.size() * sizeof(content[0]); - auto hash = std::hash{}(std::string_view(data, size)); return std::hash{}(std::string_view(data, size)); } From f270c455f935b89dc0bb9b44ecc5be6d1fe7a5ad Mon Sep 17 00:00:00 2001 From: Anastasiia Pnevskaia Date: Tue, 4 Mar 2025 16:44:39 +0100 Subject: [PATCH 6/7] Minor correction. --- src/cpp/src/sequence_group.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpp/src/sequence_group.cpp b/src/cpp/src/sequence_group.cpp index 3dc7ed5686..9c5b31a6e7 100644 --- a/src/cpp/src/sequence_group.cpp +++ b/src/cpp/src/sequence_group.cpp @@ -60,7 +60,7 @@ size_t Sequence::_make_hash(size_t content_length) { else { OPENVINO_THROW("Hash calculation is not supported for this sequence type."); } - const char* data = reinterpret_cast(content.data()); + const char* data = reinterpret_cast(content.data()); std::size_t size = content.size() * sizeof(content[0]); return std::hash{}(std::string_view(data, size)); } From fd7b11dd26b92367cfa65a3f01015d0749c5784d Mon Sep 17 00:00:00 2001 From: Anastasiia Pnevskaia Date: Tue, 4 Mar 2025 17:12:35 +0100 Subject: [PATCH 7/7] Minor correction. --- src/cpp/src/sequence_group.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cpp/src/sequence_group.cpp b/src/cpp/src/sequence_group.cpp index 9c5b31a6e7..93bf34b494 100644 --- a/src/cpp/src/sequence_group.cpp +++ b/src/cpp/src/sequence_group.cpp @@ -66,7 +66,6 @@ size_t Sequence::_make_hash(size_t content_length) { } std::vector Sequence::_reduce_embedding(const std::vector& embedding) { - size_t s = embedding.size(); size_t res_size = std::min((size_t)ceil(float(embedding.size()) / m_embeddings_hash_calculation_stride), m_embeddings_hash_max_num_values); std::vector res(res_size); for (size_t i = 0, idx=0; idx < res_size; i+= m_embeddings_hash_calculation_stride, idx++) {