MySqlDataReader
- это состояние - вы вызываете на него Read()
, и он перемещается в следующую строку, поэтому каждый поток нуждается в собственном читателе, и вам нужно придумать запрос, чтобы получить разные значения. Это может быть не слишком сложно, поскольку у вас, естественно, много запросов с разными значениями пары.
Вы также должны либо иметь словарь тем для каждого потока, либо затем объединить их, либо использовать блокировку для предотвращения параллельной модификации словаря.
Вышеприведенное предполагает, что MySQL позволит одному соединению выполнять одновременные запросы; иначе вам может понадобиться несколько подключений.
Во-первых, хотя, я хотел бы видеть, что происходит, если вы только попросите базы данных для данных, которые необходимы ("SELECT src,time FROM
журналы WHERE IP = '" + pair.Key + "' GROUP BY src"
) и использовать GetString (0) и GetInt32 (1) вместо того, чтобы использовать имена, чтобы посмотреть СРК и время; также получают значения только один раз из результата.
Я также не уверен в логике - вы не заказываете события журнала по времени, поэтому первый из них (и так хранится в словаре) может быть любым из них.
Что-то вроде этой логики - где каждые из N потоков работает только на N й пары, каждая нить имеет свой собственный читатель, и ничего на самом деле не меняет allPeople
только свойство значений в allPeople
:
private void RunSubQuery(Dictionary<string, Type> allPeople, MySqlConnection con, int threadNumber, int threadCount)
{
int hoppity = 0; // used to hop over the keys not processed by this thread
foreach (var pair in allPeople)
{
// each of the (threadCount) threads only processes the (threadCount)th key
if ((hoppity % threadCount) == threadNumber)
{
// you may need con per thread, or it might be that you can share con; I don't know
MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);
using (MySqlDataReader reader = comd.ExecuteReader())
{
var allViews = new Dictionary<string, Dictionary<int, Log>>();
while (reader.Read())
{
string src = reader.GetString(0);
int time = reader.GetInt32(1);
// do whatever to allViews with src and time
}
// no thread will be modifying the same pair.Value, so this is safe
pair.Value.View = allViews;
}
}
++hoppity;
}
}
Это не проверено. У меня нет MySQL на этой машине, и у меня нет базы данных и других типов, которые вы используете. Это также довольно процедурно (как бы вы это делали в Fortran с OpenMPI), а не обертывать все объекты задач.
Вы можете начать темы для этого вот так:
void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
{
lock (allPeople)
{
const int threadCount = 8; // the number of threads
// if it takes 18 seconds currently and you're not at .net 4 yet, then you may as well create
// the threads here as any saving of using a pool will not matter against 18 seconds
//
// it could be more efficient to use a pool so that each thread takes a pair off of
// a queue, as doing it this way means that each thread has the same number of pairs to process,
// and some pairs might take longer than others
Thread[] threads = new Thread[threadCount];
for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
{
threads[threadNumber] = new Thread(new ThreadStart(() => RunSubQuery(allPeople, connection, threadNumber, threadCount)));
threads[threadNumber].Start();
}
// wait for all threads to finish
for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
{
threads[threadNumber].Join();
}
}
}
Лишней блокировки, на allPeople делаются для того, что есть записи барьер после возврата всех потоков; Я не совсем уверен, если это необходимо. Любой объект будет делать.
Ничто в этом не гарантирует прироста производительности - возможно, библиотеки MySQL однопоточные, но сервер, безусловно, может обрабатывать несколько соединений. Измерьте с различным количеством потоков.
Если вы используете .NET 4, то вам не придется возиться созданием темы или пропуска элементов, которые вы не работает:
// this time using .net 4 parallel; assumes that connection is thread safe
static void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
{
Parallel.ForEach(allPeople, pair => RunPairQuery(pair, connection));
}
private static void RunPairQuery(KeyValuePair<string, Type> pair, MySqlConnection connection)
{
MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", connection);
using (MySqlDataReader reader = comd.ExecuteReader())
{
var allViews = new Dictionary<string, Dictionary<int, Log>>();
while (reader.Read())
{
string src = reader.GetString(0);
int time = reader.GetInt32(1);
// do whatever to allViews with src and time
}
// no iteration will be modifying the same pair.Value, so this is safe
pair.Value.View = allViews;
}
}
Вы профилированного это - то, что нашли время, и как долго? например вы проводите большую часть времени на db, или в фактическом Содержит/Добавить в Словарь – nos
Для выполнения этого требуется 18 минут, на частоте 2,9 ГГц. Дело в том, что 25k - это всего лишь тест в частично скопированной БД. Реальная вещь имеет гораздо больше. – lordstyx
Я рекомендую посмотреть новые классы параллельных вычислений, добавленные в платформу .NET 4.0. http://msdn.microsoft.com/en-gb/concurrency/bb895950.aspx. – btlog