2010-07-06 5 views
4

Я хочу обработать некоторые данные. У меня около 25 тысяч предметов в словаре. В цикле foreach я запрашиваю базу данных для получения результатов по этому элементу. Они добавляются как ценность для Словаря.Многопоточность на петле foreach?

foreach (KeyValuePair<string, Type> pair in allPeople) 
{ 
    MySqlCommand comd = new MySqlCommand("SELECT * FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con); 
    MySqlDataReader reader2 = comd.ExecuteReader(); 
    Dictionary<string, Dictionary<int, Log>> allViews = new Dictionary<string, Dictionary<int, Log>>(); 
    while (reader2.Read()) 
    { 
     if (!allViews.ContainsKey(reader2.GetString("src"))) 
     { 
      allViews.Add(reader2.GetString("src"), reader2.GetInt32("time")); 
     } 
    } 
    reader2.Close(); 
    reader2.Dispose(); 
    allPeople[pair.Key].View = allViews; 
} 

Я надеялся, что смогу сделать это быстрее с помощью многопоточности. У меня есть 8 потоков, а использование ЦП составляет около 13%. Я просто не знаю, будет ли это работать, потому что он полагается на сервер MySQL. С другой стороны, возможно, 8 потоков откроют 8 соединений БД и, следовательно, будут быстрее.

В любом случае, если многопоточность поможет в моем случае, как? oO Я никогда не работал с (несколькими) потоками, поэтому любая помощь была бы замечательной: D

+1

Вы профилированного это - то, что нашли время, и как долго? например вы проводите большую часть времени на db, или в фактическом Содержит/Добавить в Словарь – nos

+0

Для выполнения этого требуется 18 минут, на частоте 2,9 ГГц. Дело в том, что 25k - это всего лишь тест в частично скопированной БД. Реальная вещь имеет гораздо больше. – lordstyx

+2

Я рекомендую посмотреть новые классы параллельных вычислений, добавленные в платформу .NET 4.0. http://msdn.microsoft.com/en-gb/concurrency/bb895950.aspx. – btlog

ответ

3

Самая большая проблема, которая приходит на ум, заключается в том, что вы собираетесь использовать многопоточность для добавления значений в словарь, что не является безопасный поток.

Вам нужно что-то сделать like, чтобы это сработало, и вы не можете получить такую ​​выгоду от его реализации, поскольку это все равно должно блокировать объект словаря, чтобы добавить значение.

+0

Ну, словарь, который вы видите, является лишь временным. В позиции в главном словаре значение изменяется в этом цикле с временным словарем. Никакие записи из основного словаря не удаляются или не добавляются. Только изменено. Я думал, что сделать его многопоточным, чтобы 8 потоков обрабатывали первые 8 словаря, и когда один поток заканчивается, он принимает запись 9 и т. Д. И т. Д. – lordstyx

+0

Проблема здесь - это словарь allViews. См., Если было два потока, которые пытались одновременно писать в allviews без блокировки, которая создала бы проблему. Хотя вы можете использовать блокировки, чтобы обойти эту проблему, я не уверен, какую выгоду вы получите от нее. При этом я должен согласиться с заявлением Андрея о читателе - читатель2 является настоящим виновником. Есть обходные пути для большинства других вещей, но не для читателя2. – diadem

+0

Многопоточные работы, однако вы говорите ему об этом. Как правило, все происходит одновременно, что может привести к возникновению собственных проблем. Без гарантий это как пересечение без светофора. – diadem

5

MySqlDataReader - это состояние - вы вызываете на него Read(), и он перемещается в следующую строку, поэтому каждый поток нуждается в собственном читателе, и вам нужно придумать запрос, чтобы получить разные значения. Это может быть не слишком сложно, поскольку у вас, естественно, много запросов с разными значениями пары.

Вы также должны либо иметь словарь тем для каждого потока, либо затем объединить их, либо использовать блокировку для предотвращения параллельной модификации словаря.

Вышеприведенное предполагает, что MySQL позволит одному соединению выполнять одновременные запросы; иначе вам может понадобиться несколько подключений.

Во-первых, хотя, я хотел бы видеть, что происходит, если вы только попросите базы данных для данных, которые необходимы ("SELECT src,time FROM журналы WHERE IP = '" + pair.Key + "' GROUP BY src") и использовать GetString (0) и GetInt32 (1) вместо того, чтобы использовать имена, чтобы посмотреть СРК и время; также получают значения только один раз из результата.

Я также не уверен в логике - вы не заказываете события журнала по времени, поэтому первый из них (и так хранится в словаре) может быть любым из них.

Что-то вроде этой логики - где каждые из N потоков работает только на N й пары, каждая нить имеет свой собственный читатель, и ничего на самом деле не меняет allPeople только свойство значений в allPeople:

private void RunSubQuery(Dictionary<string, Type> allPeople, MySqlConnection con, int threadNumber, int threadCount) 
    { 
     int hoppity = 0; // used to hop over the keys not processed by this thread 

     foreach (var pair in allPeople) 
     { 
      // each of the (threadCount) threads only processes the (threadCount)th key 
      if ((hoppity % threadCount) == threadNumber) 
      { 
       // you may need con per thread, or it might be that you can share con; I don't know 
       MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con); 

       using (MySqlDataReader reader = comd.ExecuteReader()) 
       { 
        var allViews = new Dictionary<string, Dictionary<int, Log>>(); 

        while (reader.Read()) 
        { 
         string src = reader.GetString(0); 
         int time = reader.GetInt32(1); 

         // do whatever to allViews with src and time 
        } 

        // no thread will be modifying the same pair.Value, so this is safe 
        pair.Value.View = allViews; 
       } 
      } 

      ++hoppity; 
     } 
    } 

Это не проверено. У меня нет MySQL на этой машине, и у меня нет базы данных и других типов, которые вы используете. Это также довольно процедурно (как бы вы это делали в Fortran с OpenMPI), а не обертывать все объекты задач.

Вы можете начать темы для этого вот так:

void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection) 
    { 
     lock (allPeople) 
     { 
      const int threadCount = 8; // the number of threads 

      // if it takes 18 seconds currently and you're not at .net 4 yet, then you may as well create 
      // the threads here as any saving of using a pool will not matter against 18 seconds 
      // 
      // it could be more efficient to use a pool so that each thread takes a pair off of 
      // a queue, as doing it this way means that each thread has the same number of pairs to process, 
      // and some pairs might take longer than others 
      Thread[] threads = new Thread[threadCount]; 

      for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber) 
      { 
       threads[threadNumber] = new Thread(new ThreadStart(() => RunSubQuery(allPeople, connection, threadNumber, threadCount))); 
       threads[threadNumber].Start(); 
      } 

      // wait for all threads to finish 
      for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber) 
      { 
       threads[threadNumber].Join(); 
      } 
     } 
    } 

Лишней блокировки, на allPeople делаются для того, что есть записи барьер после возврата всех потоков; Я не совсем уверен, если это необходимо. Любой объект будет делать.

Ничто в этом не гарантирует прироста производительности - возможно, библиотеки MySQL однопоточные, но сервер, безусловно, может обрабатывать несколько соединений. Измерьте с различным количеством потоков.


Если вы используете .NET 4, то вам не придется возиться созданием темы или пропуска элементов, которые вы не работает:

// this time using .net 4 parallel; assumes that connection is thread safe 
    static void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection) 
    { 
     Parallel.ForEach(allPeople, pair => RunPairQuery(pair, connection)); 
    } 

    private static void RunPairQuery(KeyValuePair<string, Type> pair, MySqlConnection connection) 
    { 
     MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", connection); 

     using (MySqlDataReader reader = comd.ExecuteReader()) 
     { 
      var allViews = new Dictionary<string, Dictionary<int, Log>>(); 

      while (reader.Read()) 
      { 
       string src = reader.GetString(0); 
       int time = reader.GetInt32(1); 

       // do whatever to allViews with src and time 
      } 

      // no iteration will be modifying the same pair.Value, so this is safe 
      pair.Value.View = allViews; 
     } 
    } 
+1

Пожалуйста, обратите внимание, что сегментирование операторов sql может или не может добавить дополнительное снижение производительности - тестирование должно быть выполнено. – diadem

+0

В настоящее время я тестирую запрос, который обрабатывает только те поля, которые мне нужны. – lordstyx

+0

Хорошо, когда я только запрашиваю в полях, которые мне нужны, это занимает 18 минут. Это всего лишь несколько мс быстрее. Я попробую ваш код, у меня есть один вопрос, хотя xD я понятия не имею, как «// запускать это в каждом потоке с последующими значениями threadNumber»; Не могли бы вы объяснить? Как я уже сказал, я совершенно новичок в многопоточности. – lordstyx

0

Я думаю, что если вы работают на многоядерной машине, вы можете получить преимущества от многопоточности.

Однако, как бы я ни хотел, нужно сначала разблокировать поток, который вы используете, делая асинхронные вызовы базы данных. Обратные обратные вызовы будут выполняться в фоновом потоке, так что вы получите несколько преимуществ для нескольких ящиков, и вы не будете блокировать потоки, ожидающие возвращения db.

Для интенсивных приложений ввода-вывода, подобных этому примеру, похоже, что вы, вероятно, увидите улучшенную пропускную способность в зависимости от того, какую нагрузку может обрабатывать db. Предполагая, что шкалы db обрабатывают несколько одновременных запросов, вы должны быть хорошими.

1

Прежде чем что-либо сделать, узнайте, где именно проводится время. Проверьте план выполнения запроса. Первое, что я подозреваю, это отсутствие индекса в logs.IP.

18 минут для чего-то вроде этого кажется слишком длинным для меня. Даже если вы можете сократить время выполнения в восемь, добавив больше потоков (что маловероятно!), Вы все равно в конечном итоге используете более 2 минут. Вероятно, вы могли бы прочитать целые 25k строк в памяти менее чем за пять секунд и выполнить необходимую обработку в памяти ...

EDIT: Просто для того, чтобы уточнить, я не выступаю за то, чтобы делать это в памяти, просто говоря, что это похоже, что здесь есть более узкое место, которое можно удалить.

0

Спасибо всем за помощь. В настоящее время я использую это

for (int i = 0; i < 8; i++) 
{ 
    ThreadPool.QueueUserWorkItem(addDistinctScres, i); 
} 

ThreadPool для запуска всех потоков. Я использую метод, предоставленный Питом Киркхэмом, и я создаю новое соединение на поток. Время сократилось до 4 минут.

Далее Я сделаю что-нибудь, ждущего обратного вызова потока? перед выполнением других функций.

Я думаю, что узким местом является сервер MySQL, потому что загрузка процессора падает.

@odd parity Я думал об этом, но реальная вещь - waaay более 25 тысяч строк. Idk, если это сработает.

+0

Хе-хе, я не имел в виду, что вы должны _actually_ загрузить всю таблицу в память, просто, если вы сравните время, которое «идеальный» подход займет с текущим временем, это очевидно, что есть узкое место где-то, были рассмотрены. Все, что я говорю, это то, что ваше время, вероятно, будет лучше потрачено на то, чтобы найти это узкое место, чем оптимизировать текущий код. –

+0

О, ладно. Ну, после удаления узкого места с одним потоком (теперь с использованием 8: D) я думаю, что новым узким местом является база данных, так как использование ЦП имеет пиковые значения в процессе. Я не знаю, что еще может быть. – lordstyx

1

Предположение:

  1. Там расположена таблица Люди в базе данных
  2. Есть много людей в вашей базы данных

Каждый запрос к базе данных добавляет накладные расходы вы делаете один дб запроса для каждого из людей в вашей базе данных я бы предположил, что быстрее было вернуть все данные в один запрос, а затем повторить вызовы

select l.ip,l.time,l.src 
    from logs l, people p 
    where l.ip = p.ip 
    group by l.ip, l.src 

Попробуйте это с петлей в одном потоке, я верю, что это будет намного быстрее, чем ваш существующий код.

С помощью существующего кода вы можете сделать вывод о том, что вы создали MySqlCommand из цикла, подготовьте его заранее и просто измените параметр. Это должно ускорить выполнение SQL. см http://dev.mysql.com/doc/refman/5.0/es/connector-net-examples-mysqlcommand.html#connector-net-examples-mysqlcommand-prepare

MySqlCommand comd = new MySqlCommand("SELECT * FROM `logs` WHERE IP = ?key GROUP BY src", con); 
comd.prepare(); 
comd.Parameters.Add("?key","example"); 
foreach (KeyValuePair<string, Type> pair in allPeople) 
{ 
    comd.Parameters[0].Value = pair.Key; 

Если вы используете Mutiple потоков, каждый поток будет по-прежнему нужна там собственная команда, в Lest в MS-SQL это все равно будет быстрее, даже если вы воссозданы и подготовили Постулаты каждый раз, из-за возможность для SQL-сервера иметь возможность кэшировать план выполнения параметризированного статута.

+0

Нет, есть одна таблица со столбцами IP | время | src Я на самом деле пытаюсь сделать что-то, что проанализирует эти журналы и найдет в них спам/шаблоны. Это не слишком серьезно, и мне просто интересно, как я могу загрузить его быстрее: P – lordstyx

+0

@lordstyx в вашем сообщении, откуда приходит переменная allPeople и сколько элементов в ней? –

+0

allPeople - это глобальный словарь с ключом IP, а значения присваиваются в этой функции. IP-адреса получаются в функции diff. – lordstyx

Смежные вопросы