2015-07-09 2 views
1

У меня есть tcpdumps (.pcap) файлы захваченных пакетов с миллионами пакетов. Мне нужно сгруппировать эти сетевые пакеты в потоки TCP.cuda упорный подход к группировке пакетов в потоке tcp

Пример: Рассмотрим следующие пакеты нет => source_ip, destination_ip, source_port, destination_port

1 => ip1, ф2, s1, s2

2 => ip1, IP3, s3 , s4

3 => ф2, IP1, s2, s1

4 => IP3, IP1, s4, s3

в настоящее время в выше примера из четырех пакетов, пакеты 1,3 и 2,4 представляют собой пакеты одного и того же потока. i.e мне нужно разрешить следующие пакеты как [[1,3], [2,4]].

мой подход:

С (ip1, ф2, s1, s2) и (ф2, ip1, s2, s1) показывает тот же поток, так что я решил хэш оба и назовите его forward_hash и обратный хэш поскольку они обозначают пакеты одного и того же потока, протекающие в противоположных направлениях.

Я использую индексный массив для отслеживания пакетов во время замены и сортировки. После окончательной сортировки, начальные и окончание же хешей извлекаются и используются против индекса массива, чтобы получить индексы пакетов, которые представляют этот поток

keys is the forward_hash of each packets, 
count is number of packets, 
packet_ids is the id of each packet corresponding to each of the hash 

    thrust::device_vector<unsigned long long> d_keys(keys,(keys+count)); 
      thrust::device_vector<unsigned long long> d_ids(packet_ids,(packet_ids+count)); 
      // now sort the ids according to the keys 
      thrust::sort_by_key(d_keys.begin(), d_keys.end(), d_ids.begin()); 
// after sorting, now we need to find the index of each hash 
thrust::device_vector<unsigned long long> u_keys(count); 
     thrust::device_vector<unsigned long long> output(count); 

     thrust::pair<thrust::device_vector<unsigned long long>::iterator, thrust::device_vector<unsigned long long>::iterator> new_end; 
     new_end = thrust::reduce_by_key(d_keys.begin(), d_keys.end(),thrust::make_constant_iterator(1),u_keys.begin(),output.begin()); 
// now we need to find starting index to each hash 
.... 

Я пытался реализовать хэш-таблицу для поиска в уникальном прямом и обратной хэш, но для замены каждого реверсивного хеша с передним хешем перед сортировкой ... но довольно медленным по производительности. I Любая помощь?

Благодаря

ответ

1

Я предлагаю другой подход, который первые виды в пределах каждый пакет, а затем сортируют пакеты.

Пример кода выполняет следующие действия:

  1. Для того, чтобы идентифицировать пакеты одного и того же потока TCP, нам нужно сортировать пакеты. Прежде чем мы сможем это сделать, мы должны убедиться, что в пределах отсортированы по каждому отправленному источнику и месту назначения. Пример: 20:1 -> 10:4 становится 10:4 -> 20:1

  2. Теперь мы можем сортировать пакеты так, сгруппированных пакеты одного и того же потока. Этот код предполагает, что входные пакеты сортируются по времени. Мы применяем сортировку , чтобы сохранить сортировку в каждом потоке.

  3. Нам нужно выяснить, где начинается каждый поток TCP. Результатом этого шага являются индексы, указывающие на начало потока TCP в отсортированном списке пакетов.

  4. В зависимости от того, как вам нужен результат, мы можем генерировать дополнительную информацию о потоках, такую ​​как количество пакетов в потоке.

Возможные улучшения:

Если вы знаете, что IP-адрес только в определенном ограниченном диапазоне, они могут быть представлены с использованием только 16 бит. Тогда вы могли бы сжать адрес отправителя, порт отправителя, адрес получателя, порт приемника в 64-битное целое число, что улучшит производительность сортировки.


компиляции и запуска

nvcc -std=c++11 sort_packets.cu -o sort_packets && ./sort_packets 

выход

input data 
d_src_addr: 20 10 20 20 30 30 10 20 30 20 
d_src_port: 1 2 3 1 2 2 6 1 1 1 
d_dst_addr: 10 20 30 10 20 20 30 10 10 10 
d_dst_port: 4 2 3 4 5 5 1 4 6 4 

packets after sort_within_packet 
d_src_addr: 10 10 20 10 20 20 10 10 10 10 
d_src_port: 4 2 3 4 5 5 6 4 6 4 
d_dst_addr: 20 20 30 20 30 30 30 20 30 20 
d_dst_port: 1 2 3 1 2 2 1 1 1 1 

after stable_sort 
d_orig_ind: 1 0 3 7 9 6 8 2 4 5 

packets after stable_sort 
d_src_addr: 10 10 10 10 10 10 10 20 20 20 
d_src_port: 2 4 4 4 4 6 6 3 5 5 
d_dst_addr: 20 20 20 20 20 30 30 30 30 30 
d_dst_port: 2 1 1 1 1 1 1 3 2 2 

after copy_if 
d_start_indices: 0 1 5 7 8 
d_stream_lengths: 1 4 2 1 2 

group of streams referencing the original indices 
[1] [0,3,7,9] [6,8] [2] [4,5] 

sort_packets.cu

#include <stdint.h> 
#include <iostream> 
#include <thrust/device_vector.h> 
#include <thrust/iterator/zip_iterator.h> 
#include <thrust/iterator/transform_iterator.h> 
#include <thrust/iterator/counting_iterator.h> 
#include <thrust/sort.h> 
#include <thrust/sequence.h> 
#include <thrust/copy.h> 
#include <thrust/functional.h> 
#include <thrust/adjacent_difference.h> 
#include <thrust/scatter.h> 


#define PRINTER(name) print(#name, (name)) 
template <template <typename...> class V, typename T, typename ...Args> 
void print(const char* name, const V<T,Args...> & v) 
{ 
    std::cout << name << ":\t"; 
    thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t")); 
    std::cout << std::endl; 
} 

typedef thrust::tuple<uint32_t, uint16_t, uint32_t, uint16_t> Packet; 

struct sort_within_packet : public thrust::unary_function<Packet, Packet> 
{ 
    __host__ __device__ 
    Packet operator()(Packet p) const 
    { 
     if (thrust::get<0>(p) > thrust::get<2>(p)) 
     { 
      Packet copy(p); 
      thrust::get<0>(p) = thrust::get<2>(copy); 
      thrust::get<1>(p) = thrust::get<3>(copy); 
      thrust::get<2>(p) = thrust::get<0>(copy); 
      thrust::get<3>(p) = thrust::get<1>(copy); 
     } 
     return p; 
    } 
}; 

struct find_start_indices : public thrust::unary_function<thrust::tuple<Packet, Packet>, bool> 
{ 
    __host__ __device__ 
    bool operator()(thrust::tuple<Packet, Packet> p) 
    { 
     return (thrust::get<0>(p) != thrust::get<1>(p)); 
    } 
}; 

template<typename... Iterators> 
__host__ __device__ 
thrust::zip_iterator<thrust::tuple<Iterators...>> zip(Iterators... its) 
{ 
    return thrust::make_zip_iterator(thrust::make_tuple(its...)); 
} 


int main() 
{ 
    // in this example we just have 10 packets 
    const int N = 10; 

    // demo data 
    // this example uses very simple "IP addresses" 
    uint32_t srcAddrArray[N] = {20, 10, 20, 20, 30, 30, 10, 20, 30, 20}; 
    uint16_t srcPortArray[N] = {1 , 2 , 3 , 1 , 2 , 2 , 6 , 1 , 1 , 1 }; 

    uint32_t dstAddrArray[N] = {10, 20, 30, 10, 20, 20, 30, 10, 10, 10}; 
    uint16_t dstPortArray[N] = {4 , 2 , 3 , 4 , 5 , 5 , 1 , 4 , 6 , 4 }; 

    // upload data to GPU 
    thrust::device_vector<uint32_t> d_src_addr(srcAddrArray, srcAddrArray+N); 
    thrust::device_vector<uint16_t> d_src_port(srcPortArray, srcPortArray+N); 

    thrust::device_vector<uint32_t> d_dst_addr(dstAddrArray, dstAddrArray+N); 
    thrust::device_vector<uint16_t> d_dst_port(dstPortArray, dstPortArray+N); 

    thrust::device_vector<uint32_t> d_orig_ind(N); 
    thrust::sequence(d_orig_ind.begin(), d_orig_ind.end()); 

    std::cout << "input data" << std::endl; 
    PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl; 

    // 1. sort within packet 
    auto zip_begin = zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin()); 
    auto zip_end = zip(d_src_addr.end(), d_src_port.end(), d_dst_addr.end(), d_dst_port.end()); 
    thrust::transform(zip_begin, zip_end, zip_begin, sort_within_packet()); 

    std::cout << "packets after sort_within_packet" << std::endl; 
    PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl; 

    // 2. sort packets 
    thrust::stable_sort(zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin(), d_orig_ind.begin()), 
         zip(d_src_addr.end(), d_src_port.end(), d_dst_addr.end(), d_dst_port.end(), d_orig_ind.end())); 

    std::cout << "after stable_sort" << std::endl; 
    PRINTER(d_orig_ind); std::cout << std::endl; 

    std::cout << "packets after stable_sort" << std::endl; 
    PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl; 

    // 3. find stard indices of each stream 
    thrust::device_vector<uint32_t> d_start_indices(N); 

    using namespace thrust::placeholders; 
    thrust::device_vector<uint32_t>::iterator copyEnd = thrust::copy_if(thrust::make_counting_iterator(1), thrust::make_counting_iterator(N), 
                      thrust::make_transform_iterator(
                       zip(
                        zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin()), 
                        zip(d_src_addr.begin()+1, d_src_port.begin()+1, d_dst_addr.begin()+1, d_dst_port.begin()+1) 
                       ), 
                       find_start_indices() 
                      ), 
                      d_start_indices.begin()+1, _1); 

    uint32_t streamCount = copyEnd-d_start_indices.begin(); 
    d_start_indices.resize(streamCount); 

    std::cout << "after copy_if" << std::endl; 
    PRINTER(d_start_indices); 

    // 4. generate some additional information about the result and print result formatted 
    thrust::device_vector<uint32_t> d_stream_lengths(streamCount+1); 
    thrust::adjacent_difference(d_start_indices.begin(), d_start_indices.end(), d_stream_lengths.begin()); 
    d_stream_lengths.erase(d_stream_lengths.begin()); 
    d_stream_lengths.back() = N-d_start_indices.back(); 
    PRINTER(d_stream_lengths); 

    thrust::host_vector<uint32_t> h_start_indices = d_start_indices; 
    thrust::host_vector<uint32_t> h_orig_ind = d_orig_ind; 

    auto index = h_start_indices.begin(); 
    index++; 

    std::cout << std::endl << "group of streams referencing the original indices"<< std::endl << "[" << h_orig_ind[0]; 
    for(int i=1; i<N;++i) 
    { 
     if (i == *index) 
     { 
     index++; 
     std::cout << "]\t["; 
     } 
     else 
     { 
     std::cout << ","; 
     } 
     std::cout << h_orig_ind[i]; 
    } 
    std::cout << "]" << std::endl; 

    return 0; 
} 
+0

вы спасатель .... Постараюсь это и see..thanks – rawcoder

+0

Это держит меня дает эту ошибку ошибку: не экземпляр шаблона функции «молнии» соответствует списку аргументов ?? – rawcoder

+0

@rawcoder для этого кода требуется C++ 11, см. ** компиляция и запуск ** выше. это будет работать с CUDA7 и недавним компилятором, таким как GCC 4.8 –

Смежные вопросы