Форум программистов
 

Восстановите пароль или Зарегистрируйтесь на форуме, о проблемах и с заказом рекламы пишите сюда - alarforum@yandex.ru, проверяйте папку спам!

Вернуться   Форум программистов > .NET Frameworks (точка нет фреймворки) > C# (си шарп)
Регистрация

Восстановить пароль
Повторная активизация e-mail

Купить рекламу на форуме - 42 тыс руб за месяц

Ответ
 
Опции темы Поиск в этой теме
Старый 14.02.2013, 22:57   #1
fshlik
Новичок
Джуниор
 
Регистрация: 14.02.2013
Сообщений: 3
Вопрос Thread и Queue: пересылка данных из потока в поток

Доброе время суток!
****************
Была задача по пересылке данных из одного потока в другой
вот решил его через Queue, все работает, но только вот
ресурсов отжирает очень много, целое ядро грузит программа.
А вся проблема, на мой взгляд, это организация FIFO через Queue(далее очередь), видимо не предусмотрено ее использовать в таком режиме.

Алгоритм:в первом потоке я в очередь постоянно записываю массив, а во втором постоянно считываю, но первый поток работает медленнее второго и поэтому второй поток иногда вычитывает все элементы из очереди и ждет новых в каждом цикле проверяя очередь, для того чтобы не обратиться к пустой очереди, я сделал две защиты это проверка:

if (FifoBlock.Count != 0) или if (FifoBlock.Peek() != null)
и
try{
...
}
catch{
...
}


Так вот, проверка на количество элементов в очереди, очень ресурсозатратно работает.

Буду рад совету по оптимизации данного кода или другому алгоритму

Код:
using System.Collections;  
//...
//Очередь
Queue FifoBlock = new Queue();      

//THREAD1-READER 
public void Thr1()
{
 int len = xferSize;
 byte[] buf = new byte[xferSize];
 while (bRunTh1)
  {
   // code read data
   // ...
   // write data to buf
   FifoBlock.Enqueue(buf.Clone());                                
  }//while
}

//THREAD2-WRITER 
public void Thr2()
{
 while (bRunTh2)
 (
  try
    {
     if (FifoBlock.Count != 0)
      //if (FifoBlock.Peek() != null)
          {
           // Message: "FIFO: not empty ";
           buf = (byte[])FifoBlock.Dequeue();
           //code read buf and any process
           // ...
          }//fifoCount
        else
          {
           // Message: "FIFO: empty ";
          }
     }
   catch (InvalidOperationException)
     {
      // Message: "Exception: empty FIFO"
     }
)
fshlik вне форума Ответить с цитированием
Старый 15.02.2013, 07:06   #2
phomm
personality
Старожил
 
Аватар для phomm
 
Регистрация: 28.04.2009
Сообщений: 2,882
По умолчанию

Thread.Sleep() ?
phomm вне форума Ответить с цитированием
Старый 15.02.2013, 10:08   #3
fshlik
Новичок
Джуниор
 
Регистрация: 14.02.2013
Сообщений: 3
По умолчанию

Цитата:
Сообщение от phomm Посмотреть сообщение
Thread.Sleep() ?
Второй поток должен постоянно работать, он не сильно грузит систему,
проблема в передаче данных между потоками, если постоянно делать проверку
типа
Код:
if (FifoBlock.Count > 0)
, то это грузит систему,
самое простое решение при данном алгоритме, которое мне приходит в голову,
это ввести в первом потоке переменную bool flag и во втором потоке смотреть,
если данные пришли в первый поток flag=true, во втором потоке вычитал всю очередь, то
flag=false, и мониторить уже flag, это менее болезненно, чем мониторить очередь, но
мне это решение не очень нравится.
fshlik вне форума Ответить с цитированием
Старый 15.02.2013, 11:50   #4
phomm
personality
Старожил
 
Аватар для phomm
 
Регистрация: 28.04.2009
Сообщений: 2,882
По умолчанию

Цитата:
Сообщение от fshlik Посмотреть сообщение
Второй поток должен постоянно работать, он не сильно грузит систему,
проблема в передаче данных между потоками, если постоянно делать проверку
типа
Код:
if (FifoBlock.Count > 0)
, то это грузит систему,
Не замечаете несоответствие ? "второй поток не грузит" - "а если _код из второго потока_ - грузит"
Возможно , Вы что-то не так формулируете, ибо Вас сложно понять с такими логическими нестыковками

Я и предлагаю приостанавливать второй поток, пусть проверка (точнее вообще работа потока) идёт непостоянно, а периодами.
Код:
public void Thr2()
{
 while (bRunTh2)
 {
  Thread.Sleep(5);
  try
    {
     if (FifoBlock.Count != 0)
      //if (FifoBlock.Peek() != null)
          {
           // Message: "FIFO: not empty ";
           buf = (byte[])FifoBlock.Dequeue();
           //code read buf and any process
           // ...
          }//fifoCount
        else
          {
           // Message: "FIFO: empty ";
          }
     }
   catch (InvalidOperationException)
     {
      // Message: "Exception: empty FIFO"
     }
  }
}
Не проверял, но согласно моему опыту, тормоза должны пропасть.
Ну и, не понятно, как код работал при неправильных скобках.

Если проблема Ваша не решится, киньте весь проект архивом, подебажу малость.

Последний раз редактировалось phomm; 15.02.2013 в 11:55.
phomm вне форума Ответить с цитированием
Старый 15.02.2013, 12:24   #5
fshlik
Новичок
Джуниор
 
Регистрация: 14.02.2013
Сообщений: 3
По умолчанию

Цитата:
Сообщение от phomm Посмотреть сообщение

Не проверял, но согласно моему опыту, тормоза должны пропасть.
Ну и, не понятно, как код работал при неправильных скобках.

Если проблема Ваша не решится, киньте весь проект архивом, подебажу малость.
Скобки в блокноте правил ночью, половину не тех, а другие по забывал.
Проблему решил только что, оказалось дело не в очереди,
если как в моем случае запустить два потока и пустить их вообще
без каких-либо тормозов, т.е. без Thread,Sleep(1), то система видимо отдает
под программу ядро.
Добавил задержек и программа на загрузки ядер практически никак не влияет

Код:
using System.Collections;  
//...
//Очередь
Queue FifoBlock = new Queue();      

//THREAD1-READER 
public void Thr1()
{
 int len = xferSize;
 byte[] buf = new byte[xferSize];
 while (bRunTh1)
  {
   // code read data
   // ...
   // write data to buf
   FifoBlock.Enqueue(buf.Clone());
   Thread.Sleep(1);                                
  }//while
}

//THREAD2-WRITER 
public void Thr2()
{
 while (bRunTh2)
 {
  try
    {
     if (FifoBlock.Count != 0)
      //if (FifoBlock.Peek() != null)
          {
           // Message: "FIFO: not empty ";
           buf = (byte[])FifoBlock.Dequeue();
           //code read buf and any process
           // ...
          }//fifoCount
        else
          {
           // Message: "FIFO: empty ";
          }
     }
   catch (InvalidOperationException)
     {
      // Message: "Exception: empty FIFO"
     }
Thread.Sleep(2);
}//while
}
Мне на другом форуме kolorotur посоветовал вот так изменить алгоритм. правда там нужен .NET Framewokr 4, я пока его не проверял.
Код:
using System.Collections;  
using System.Collections.Concurrent;
//...
//Очередь
BlockingCollection<byte[]> FifoBlock = new BlockingCollection<byte[]>();      
 
//THREAD1-READER 
public void Thr1()
{
 int len = xferSize;
 byte[] buf = new byte[xferSize];
 while (bRunTh1)
  {
   // code read data
   // ...
   // write data to buf
   FifoBlock.Add(buf.Clone());                                
  }//while
}
 
//THREAD2-WRITER 
public void Thr2()
{
   foreach (var buf in FifoBlock.GetConsumingEnumerable())
   {
      ...
   }
}}
Спасибо за содействие в решении задачи)
fshlik вне форума Ответить с цитированием
Старый 16.02.2013, 13:27   #6
Демон
Пользователь
 
Регистрация: 12.11.2008
Сообщений: 17
Стрелка

Все что тут написано - за такое я бы по рукам надавал)
Какие задержки, простите?
Это же универская задача producer/consumer (поставщик/потребитель) про семафоры!
Вот написал простенький пример:
Код:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace For
{
    class Program
    {
        static void Main()
        {
            var semaphore = new SemaphoreSlim(0);
            var factory = new TaskFactory();
            var queue = new Queue<int>();

            factory.StartNew(() =>
                                 {
                                     // поток производитель
                                     while (true)
                                     {
                                         var item = 8;
                                         Thread.Sleep(1000); // задержка, симуляция долгого производства
                                         queue.Enqueue(item);
                                         semaphore.Release(); // к семафору +1
                                     }
                                 });


            factory.StartNew(() =>
                                 {
                                     // поток потребитель
                                     while (true)
                                     {
                                         semaphore.Wait(); // ждем семафора, потом -1
                                         var item = queue.Dequeue();
                                         Console.WriteLine(item);
                                     }
                                 });

            Console.ReadLine();
        }
    }
}
Также ответ с другого формума про блокирующие коллекции от kolorotur тоже хорош
Демон вне форума Ответить с цитированием
Ответ


Купить рекламу на форуме - 42 тыс руб за месяц



Похожие темы
Тема Автор Раздел Ответов Последнее сообщение
Поток (Thread) не хочет показывать панель (Panel). bagabuga Общие вопросы Delphi 7 30.03.2012 14:57
спонтанная ошибка при синхронизации потока Thread и основного приложения(Посылка сообщений) Человек_Борща Общие вопросы Delphi 2 14.05.2011 22:25
Thread. проблемы с работой потока. Моментально исчезают созданные в потоке формы. Casper-SC Общие вопросы .NET 3 24.04.2010 12:28
Thread.Приложение не отвечает. Не запускается поток. Zerofill Общие вопросы Delphi 2 10.10.2009 16:04