Я предлагаю другой подход, который первые виды в пределах каждый пакет, а затем сортируют пакеты.
Пример кода выполняет следующие действия:
Для того, чтобы идентифицировать пакеты одного и того же потока TCP, нам нужно сортировать пакеты. Прежде чем мы сможем это сделать, мы должны убедиться, что в пределах отсортированы по каждому отправленному источнику и месту назначения. Пример: 20:1 -> 10:4
становится 10:4 -> 20:1
Теперь мы можем сортировать пакеты так, сгруппированных пакеты одного и того же потока. Этот код предполагает, что входные пакеты сортируются по времени. Мы применяем сортировку , чтобы сохранить сортировку в каждом потоке.
Нам нужно выяснить, где начинается каждый поток TCP. Результатом этого шага являются индексы, указывающие на начало потока TCP в отсортированном списке пакетов.
В зависимости от того, как вам нужен результат, мы можем генерировать дополнительную информацию о потоках, такую как количество пакетов в потоке.
Возможные улучшения:
Если вы знаете, что IP-адрес только в определенном ограниченном диапазоне, они могут быть представлены с использованием только 16 бит. Тогда вы могли бы сжать адрес отправителя, порт отправителя, адрес получателя, порт приемника в 64-битное целое число, что улучшит производительность сортировки.
компиляции и запуска
nvcc -std=c++11 sort_packets.cu -o sort_packets && ./sort_packets
выход
input data
d_src_addr: 20 10 20 20 30 30 10 20 30 20
d_src_port: 1 2 3 1 2 2 6 1 1 1
d_dst_addr: 10 20 30 10 20 20 30 10 10 10
d_dst_port: 4 2 3 4 5 5 1 4 6 4
packets after sort_within_packet
d_src_addr: 10 10 20 10 20 20 10 10 10 10
d_src_port: 4 2 3 4 5 5 6 4 6 4
d_dst_addr: 20 20 30 20 30 30 30 20 30 20
d_dst_port: 1 2 3 1 2 2 1 1 1 1
after stable_sort
d_orig_ind: 1 0 3 7 9 6 8 2 4 5
packets after stable_sort
d_src_addr: 10 10 10 10 10 10 10 20 20 20
d_src_port: 2 4 4 4 4 6 6 3 5 5
d_dst_addr: 20 20 20 20 20 30 30 30 30 30
d_dst_port: 2 1 1 1 1 1 1 3 2 2
after copy_if
d_start_indices: 0 1 5 7 8
d_stream_lengths: 1 4 2 1 2
group of streams referencing the original indices
[1] [0,3,7,9] [6,8] [2] [4,5]
sort_packets.cu
#include <stdint.h>
#include <iostream>
#include <thrust/device_vector.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/sort.h>
#include <thrust/sequence.h>
#include <thrust/copy.h>
#include <thrust/functional.h>
#include <thrust/adjacent_difference.h>
#include <thrust/scatter.h>
#define PRINTER(name) print(#name, (name))
template <template <typename...> class V, typename T, typename ...Args>
void print(const char* name, const V<T,Args...> & v)
{
std::cout << name << ":\t";
thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t"));
std::cout << std::endl;
}
typedef thrust::tuple<uint32_t, uint16_t, uint32_t, uint16_t> Packet;
struct sort_within_packet : public thrust::unary_function<Packet, Packet>
{
__host__ __device__
Packet operator()(Packet p) const
{
if (thrust::get<0>(p) > thrust::get<2>(p))
{
Packet copy(p);
thrust::get<0>(p) = thrust::get<2>(copy);
thrust::get<1>(p) = thrust::get<3>(copy);
thrust::get<2>(p) = thrust::get<0>(copy);
thrust::get<3>(p) = thrust::get<1>(copy);
}
return p;
}
};
struct find_start_indices : public thrust::unary_function<thrust::tuple<Packet, Packet>, bool>
{
__host__ __device__
bool operator()(thrust::tuple<Packet, Packet> p)
{
return (thrust::get<0>(p) != thrust::get<1>(p));
}
};
template<typename... Iterators>
__host__ __device__
thrust::zip_iterator<thrust::tuple<Iterators...>> zip(Iterators... its)
{
return thrust::make_zip_iterator(thrust::make_tuple(its...));
}
int main()
{
// in this example we just have 10 packets
const int N = 10;
// demo data
// this example uses very simple "IP addresses"
uint32_t srcAddrArray[N] = {20, 10, 20, 20, 30, 30, 10, 20, 30, 20};
uint16_t srcPortArray[N] = {1 , 2 , 3 , 1 , 2 , 2 , 6 , 1 , 1 , 1 };
uint32_t dstAddrArray[N] = {10, 20, 30, 10, 20, 20, 30, 10, 10, 10};
uint16_t dstPortArray[N] = {4 , 2 , 3 , 4 , 5 , 5 , 1 , 4 , 6 , 4 };
// upload data to GPU
thrust::device_vector<uint32_t> d_src_addr(srcAddrArray, srcAddrArray+N);
thrust::device_vector<uint16_t> d_src_port(srcPortArray, srcPortArray+N);
thrust::device_vector<uint32_t> d_dst_addr(dstAddrArray, dstAddrArray+N);
thrust::device_vector<uint16_t> d_dst_port(dstPortArray, dstPortArray+N);
thrust::device_vector<uint32_t> d_orig_ind(N);
thrust::sequence(d_orig_ind.begin(), d_orig_ind.end());
std::cout << "input data" << std::endl;
PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl;
// 1. sort within packet
auto zip_begin = zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin());
auto zip_end = zip(d_src_addr.end(), d_src_port.end(), d_dst_addr.end(), d_dst_port.end());
thrust::transform(zip_begin, zip_end, zip_begin, sort_within_packet());
std::cout << "packets after sort_within_packet" << std::endl;
PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl;
// 2. sort packets
thrust::stable_sort(zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin(), d_orig_ind.begin()),
zip(d_src_addr.end(), d_src_port.end(), d_dst_addr.end(), d_dst_port.end(), d_orig_ind.end()));
std::cout << "after stable_sort" << std::endl;
PRINTER(d_orig_ind); std::cout << std::endl;
std::cout << "packets after stable_sort" << std::endl;
PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl;
// 3. find stard indices of each stream
thrust::device_vector<uint32_t> d_start_indices(N);
using namespace thrust::placeholders;
thrust::device_vector<uint32_t>::iterator copyEnd = thrust::copy_if(thrust::make_counting_iterator(1), thrust::make_counting_iterator(N),
thrust::make_transform_iterator(
zip(
zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin()),
zip(d_src_addr.begin()+1, d_src_port.begin()+1, d_dst_addr.begin()+1, d_dst_port.begin()+1)
),
find_start_indices()
),
d_start_indices.begin()+1, _1);
uint32_t streamCount = copyEnd-d_start_indices.begin();
d_start_indices.resize(streamCount);
std::cout << "after copy_if" << std::endl;
PRINTER(d_start_indices);
// 4. generate some additional information about the result and print result formatted
thrust::device_vector<uint32_t> d_stream_lengths(streamCount+1);
thrust::adjacent_difference(d_start_indices.begin(), d_start_indices.end(), d_stream_lengths.begin());
d_stream_lengths.erase(d_stream_lengths.begin());
d_stream_lengths.back() = N-d_start_indices.back();
PRINTER(d_stream_lengths);
thrust::host_vector<uint32_t> h_start_indices = d_start_indices;
thrust::host_vector<uint32_t> h_orig_ind = d_orig_ind;
auto index = h_start_indices.begin();
index++;
std::cout << std::endl << "group of streams referencing the original indices"<< std::endl << "[" << h_orig_ind[0];
for(int i=1; i<N;++i)
{
if (i == *index)
{
index++;
std::cout << "]\t[";
}
else
{
std::cout << ",";
}
std::cout << h_orig_ind[i];
}
std::cout << "]" << std::endl;
return 0;
}
вы спасатель .... Постараюсь это и see..thanks – rawcoder
Это держит меня дает эту ошибку ошибку: не экземпляр шаблона функции «молнии» соответствует списку аргументов ?? – rawcoder
@rawcoder для этого кода требуется C++ 11, см. ** компиляция и запуск ** выше. это будет работать с CUDA7 и недавним компилятором, таким как GCC 4.8 –