Форум программистов
 

Восстановите пароль или Зарегистрируйтесь на форуме, о проблемах и с заказом рекламы пишите сюда - alarforum@yandex.ru, проверяйте папку спам!

Вернуться   Форум программистов > IT форум > Помощь студентам
Регистрация

Восстановить пароль
Повторная активизация e-mail

Купить рекламу на форуме - 42 тыс руб за месяц

Ответ
 
Опции темы Поиск в этой теме
Старый 03.12.2016, 10:39   #1
FatalLight
Пользователь
 
Регистрация: 16.08.2010
Сообщений: 13
Вопрос Inter-process communication

Приветствую. Занялся изучением Inter-process communication.
Цель написать программу для синхронизации 3 процессов (не потоков в 1 процессе).
Суть программы должна быть такой:
запускается 3 процесса.
каждый из процессов, кроме последнего, ждёт пока все процессы-соседи запишут данные в общую память.
последний из записавших отправляет сигнал процессам-соседям продолжать.

код программы создающей общую память:
Код:
#include <unistd.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

unsigned const int SIZE = 0x1000;

struct shmem
{
	unsigned short int buff [SIZE];
	pthread_mutex_t mutex;
	pthread_mutex_t cond_mutex;
	pthread_cond_t cond;
};


//программа для создания общей памяти.
int main(int argc, char** argv)
{
	pthread_mutexattr_t mattr;
	pthread_mutexattr_init(&mattr);
	pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
	
	pthread_condattr_t cattr;
	pthread_condattr_init(&cattr);
	pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
	
	int flags = 0, mods = 0;
	flags |= O_RDWR | O_CREAT | O_EXCL;
	mods |= S_IRUSR | S_IWUSR;
	int fd = shm_open("shmem", flags, mods);
	if(fd == -1)
	{
		shm_unlink("shmem");
		fd = shm_open("shmem", flags, mods);
		puts("Память пересоздана!");
	}
	if(fd == -1)
	{
		printf("[%d] ERROR пересоздать память не удалось\n",fd);
		exit(-1);
	}
	ftruncate(fd, sizeof(shmem));
	void* addr_shmem = mmap(NULL, sizeof(shmem), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
	shmem* p_shmem = static_cast<shmem*> (addr_shmem);
	
	pthread_mutex_init(&p_shmem->mutex, &mattr);
	pthread_mutex_init(&p_shmem->cond_mutex, &mattr);
	pthread_cond_init(&p_shmem->cond, &cattr);
	for(unsigned int i=0;i < SIZE;i++)
		p_shmem->buff[i]=0x5ACF;
	return 0;
}
код программы-процесс:

Код:
#include <unistd.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

unsigned const int SIZE = 0x1000;
unsigned const int OFFSET = 320;
unsigned const int SIZE_PACK = 32;
int proc = -1;
int proc1 = -1;
int proc2 = -1;

struct shmem
{
	unsigned short int buff [SIZE];
	pthread_mutex_t mutex;
	pthread_mutex_t cond_mutex;
	pthread_cond_t cond;
};

int main(int argc, char** argv)
{	
	// разбор параметров. параметром передаётся номер процесса.
	if (argc != 2)
	{
		printf("Не указан номер процесса");
		exit(0);
	}
	else
	{
		proc = atoi(argv[1]);// проверка ввода не осуществляется.
		printf("процесс = [%d] аргумент = [%s]\n", proc, argv[1]);
	}
	
	// получить общщую память.
	struct stat stat;
	int fd = shm_open("shmem", O_RDWR, S_IRUSR | S_IWUSR);
	fstat(fd, &stat);
	printf("размер памяти = [%ld]\n", stat.st_size);
	void* addr_shmem = mmap(NULL, stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
	shmem* p_shmem = static_cast<shmem*> (addr_shmem);
	close(fd);

	// назначение соседних процессов.
	switch(proc)
	{
		case 0:
			proc1 = 1;
			proc2 = 2;
		break;
		case 1:
			proc1 = 0;
			proc2 = 2;
		break;
		case 2:
			proc1 = 0;
			proc2 = 1;
		break;
		default:
			puts("нельзя указывать номер процесса больше 2");
			exit(0);
	}

	unsigned long int count = 0;
	while(true)
	{
		pthread_mutex_lock(&p_shmem->mutex);

		for(unsigned int i=0;i<SIZE_PACK;i++)
			p_shmem->buff[SIZE_PACK*proc+i] = count;// записать данные(номер вхождения в цикл)
		p_shmem->buff[OFFSET+proc] = 0xAAAA;// записать значение что процесс записал все данные.
		printf("процесс [%d] записал все данные\n", proc);
		if(p_shmem->buff[OFFSET+proc1] == 0xAAAA && p_shmem->buff[OFFSET+proc2] == 0xAAAA)//проверить готовы ли соседние процессы
		{
			puts("Все процессы записали данные. ПУСК!!!");
			printf("[%X][%X][%X]\n",p_shmem->buff[OFFSET+proc],p_shmem->buff[OFFSET+proc1],p_shmem->buff[OFFSET+proc2]);
			p_shmem->buff[OFFSET+proc1] = 0x5555;// сбросить флаги готовности записи всех процессов.
			p_shmem->buff[OFFSET+proc2] = 0x5555;
			p_shmem->buff[OFFSET+proc] = 0x5555;
			
			pthread_mutex_lock(&p_shmem->cond_mutex);
			pthread_cond_broadcast(&p_shmem->cond);//отправить сигналы всем соседним процессам - продолжать.
			pthread_mutex_unlock(&p_shmem->cond_mutex);
		}
		else
		{
			puts("Не все процессы записали данные. WAIT!!!");
			printf("[%X][%X][%X]\n",p_shmem->buff[OFFSET+proc],p_shmem->buff[OFFSET+proc1],p_shmem->buff[OFFSET+proc2]);
			
			pthread_mutex_unlock(&p_shmem->mutex);
			
			pthread_mutex_lock(&p_shmem->cond_mutex);
			pthread_cond_wait(&(p_shmem->cond), &(p_shmem->cond_mutex));// остановить процесс до получения сигнала от соседнего процесса.
			pthread_mutex_unlock(&p_shmem->cond_mutex);
			
			pthread_mutex_lock(&p_shmem->mutex);
			
		}
		//вывод данных после записи всех процессов.
		puts("Данные");
		for(unsigned int i=0;i<SIZE_PACK;i++)
			printf("%X ", p_shmem->buff[SIZE_PACK*proc+i]);
		puts("");
		for(unsigned int i=0;i<SIZE_PACK;i++)
			printf("%X ", p_shmem->buff[SIZE_PACK*proc1+i]);
		puts("");
		for(unsigned int i=0;i<SIZE_PACK;i++)
			printf("%X ", p_shmem->buff[SIZE_PACK*proc2+i]);
		puts("");
		pthread_mutex_unlock(&p_shmem->mutex);
		count++;
	}
	return 0;
}
Вроде учёл всё, но возникает ситуации:
1) когда все три процесса начинают ждать-соседей.
2) происходит рассинхронизация в данных процессов (переменная count).

Прошу помощи. Что я делаю не так? почему такие ситуации возникают?
ниже ссылка на скриншот консоли.
https://yadi.sk/i/jf4Sh5dy325FGd

Последний раз редактировалось FatalLight; 03.12.2016 в 10:41. Причина: не добавил ссылку на скриншот
FatalLight вне форума Ответить с цитированием
Старый 03.12.2016, 13:57   #2
GreenWizard
мальчик-помогай =)
Форумчанин
 
Регистрация: 16.09.2010
Сообщений: 522
По умолчанию

Добавь логгирование и посмотри последовательность работы процессов.... это так, универсальное решение))
в код вникать лень, но какой-то он странный.... родительский процесс создаёт шару и вырубается, не ожидая окончания работы... дочерные процессы какие-то там флаги смотрят, хотя есть же объекты синхронизации... и какое-то оно переусложнённое)
GreenWizard вне форума Ответить с цитированием
Старый 03.12.2016, 17:06   #3
olej.tsil
Заблокирован
 
Регистрация: 29.11.2016
Сообщений: 215
По умолчанию

Цитата:
Сообщение от FatalLight Посмотреть сообщение
Занялся изучением Inter-process communication.
Возьмите вот эту книжку:

Цитата:
У. Ричард Стивенс
UNIX: взаимодействие процессов
ISBN: 5-318-00534-9
576 страниц
март 2003
Питер
http://www.books.ru/books/unix-vzaim...-23626/?show=1

И будет вам счастье ... - а природе нет лучше описания по IPC.

P.S. Никто, конечно, рыться в вашем громоздком коде не станет.
olej.tsil вне форума Ответить с цитированием
Старый 03.12.2016, 17:09   #4
olej.tsil
Заблокирован
 
Регистрация: 29.11.2016
Сообщений: 215
По умолчанию

Цитата:
Сообщение от FatalLight Посмотреть сообщение
каждый из процессов, кроме последнего, ждёт пока все процессы-соседи запишут данные в общую память.
1. shared memory не является ресурсом монопольного доступа, и доступ к нему нужно защищать ... например семафором или мютексом;
2. организовывать межпроцессное взаимодействие через shared memory - это одно из самых худших решений из всех, какие только можно придумать.
olej.tsil вне форума Ответить с цитированием
Старый 03.12.2016, 19:14   #5
eoln
Старожил
 
Аватар для eoln
 
Регистрация: 26.04.2008
Сообщений: 2,645
По умолчанию

Ну что же так категорично ))
По сабжу - двойная блокировка.
Допустим первый процесс сделал
Код:
pthread_mutex_unlock(&p_shmem->mutex);
в блоке else и завис на долю секунду по какой-то причине. Так как мьютекс разблокирован, то другие 2 процесса нормально отработали. Причём, последний поток успел сделать
Код:
pthread_cond_broadcast(&p_shmem->cond);//отправить сигналы всем соседним процессам - продолжать.
.
И тут отвисает самый первый и делает
pthread_cond_wait(&(p_shmem->cond), &(p_shmem->cond_mutex));// остановить процесс до получения сигнала от соседнего процесса.
Чтобы процесс продолжил работать, другим процессам нужно сделать
Код:
pthread_cond_broadcast(&p_shmem->cond);//отправить сигналы всем соседним процессам - продолжать.
, в другой итерации, вот вам и рассинхронизация в выводе данных.

Короче, нельзя использовать разные методы синхронизации внутри друг друга. Блокировка по той же причине

Последний раз редактировалось eoln; 03.12.2016 в 19:19.
eoln вне форума Ответить с цитированием
Старый 06.12.2016, 13:09   #6
FatalLight
Пользователь
 
Регистрация: 16.08.2010
Сообщений: 13
По умолчанию

Раз никто не помог конкретным решением, никто не захотел разбираться в коде.
Ниже приведу своё решение, авось кому-нибудь да понадобится (больше нет рассинхронизации данных и процессы не встают в ожидание):

Код:
#include <unistd.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

unsigned const int SIZE = 0x1000;
unsigned const int OFFSET = 320;
unsigned const int SIZE_PACK = 28;
int proc = -1;
int proc1 = -1;
int proc2 = -1;

struct shmem
{
	unsigned short int buff [SIZE];
	pthread_mutex_t mutex;
	pthread_mutex_t cond_mutex;
	pthread_cond_t cond;
};

int main(int argc, char** argv)
{	
	// разбор параметров. параметром передаётся номер процесса.
	if (argc != 2)
	{
		printf("Не указан номер процесса");
		exit(0);
	}
	else
	{
		proc = atoi(argv[1]);// проверка ввода не осуществляется.
		printf("процесс = [%d] аргумент = [%s]\n", proc, argv[1]);
	}
	
	// получить общщую память.
	struct stat stat;
	int fd = shm_open("shmem", O_RDWR, S_IRUSR | S_IWUSR);
	fstat(fd, &stat);
	printf("размер памяти = [%ld]\n", stat.st_size);
	void* addr_shmem = mmap(NULL, stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
	shmem* p_shmem = static_cast<shmem*> (addr_shmem);
	close(fd);

	// назначение соседних процессов.
	switch(proc)
	{
		case 0:
			proc1 = 1;
			proc2 = 2;
		break;
		case 1:
			proc1 = 0;
			proc2 = 2;
		break;
		case 2:
			proc1 = 0;
			proc2 = 1;
		break;
		default:
			puts("нельзя указывать номер процесса больше 2");
			exit(0);
	}

	unsigned long int count = 0;
	while(true)
	{
		usleep(1);
		pthread_mutex_lock(&p_shmem->mutex);
		
		for(unsigned int i=0;i<SIZE_PACK;i++)
			p_shmem->buff[SIZE_PACK*proc+i] = count;// записать данные(номер вхождения в цикл)
		p_shmem->buff[OFFSET+proc] = 0xAAAA;// записать значение что процесс записал все данные.
		printf("процесс [%d] записал все данные\n", proc);
		if(p_shmem->buff[OFFSET+proc1] == 0xAAAA && p_shmem->buff[OFFSET+proc2] == 0xAAAA)//проверить готовы ли соседние процессы
		{
			puts("Все процессы записали данные. ПУСК!!!");
			printf("[%X][%X][%X]\n",p_shmem->buff[OFFSET+proc],p_shmem->buff[OFFSET+proc1],p_shmem->buff[OFFSET+proc2]);
			p_shmem->buff[OFFSET+proc1] = 0x5555;// сбросить флаги готовности записи всех процессов.
			p_shmem->buff[OFFSET+proc2] = 0x5555;
			p_shmem->buff[OFFSET+proc] = 0x5555;
			pthread_mutex_lock(&p_shmem->cond_mutex);
			pthread_cond_broadcast(&p_shmem->cond);//отправить сигналы всем соседним процессам - продолжать.
			pthread_mutex_unlock(&p_shmem->cond_mutex);
		}
		else
		{
			puts("Не все процессы записали данные. WAIT!!!");
			printf("[%X][%X][%X]\n",p_shmem->buff[OFFSET+proc],p_shmem->buff[OFFSET+proc1],p_shmem->buff[OFFSET+proc2]);
			pthread_mutex_unlock(&p_shmem->mutex);
			pthread_mutex_lock(&p_shmem->cond_mutex);
			while(p_shmem->buff[OFFSET+proc] != 0x5555)
				pthread_cond_wait(&(p_shmem->cond), &(p_shmem->cond_mutex));// остановить процесс до получения сигнала от соседнего процесса.
			pthread_mutex_unlock(&p_shmem->cond_mutex);
			pthread_mutex_lock(&p_shmem->mutex);
		}
		puts("Данные");
		for(unsigned int i=0;i<SIZE_PACK;i++)
			printf("%X ", p_shmem->buff[SIZE_PACK*proc+i]);
		puts("");
		for(unsigned int i=0;i<SIZE_PACK;i++)
			printf("%X ", p_shmem->buff[SIZE_PACK*proc1+i]);
		puts("");
		for(unsigned int i=0;i<SIZE_PACK;i++)
			printf("%X ", p_shmem->buff[SIZE_PACK*proc2+i]);
		puts("");
		pthread_mutex_unlock(&p_shmem->mutex);
		count++;
	}
	return 0;
}
З.Ы. Прошу обратить внимание на первое условие внутри цикла, и то как расставлены блокировки мьютексов в положительной ветке. По факту оказалось можно.
FatalLight вне форума Ответить с цитированием
Старый 06.12.2016, 13:42   #7
eoln
Старожил
 
Аватар для eoln
 
Регистрация: 26.04.2008
Сообщений: 2,645
По умолчанию

Цитата:
Сообщение от FatalLight Посмотреть сообщение
По факту оказалось можно.
Не можно )).
Между
Код:
while(p_shmem->buff[OFFSET+proc] != 0x5555)
и
Код:
pthread_cond_wait(&(p_shmem->cond), &(p_shmem->cond_mutex));// остановить процесс до получения сигнала от соседнего процесса.
может быть переключение на другой процесс. Сначала выполнится проверка p_shmem->buff[OFFSET+proc] != 0x5555, затем процесс по воле Богов зависнет, другой процесс отработает как надо (при этом условие уже окажется не верным), и тут отвисаем мы и ... опять двадцать шесть. Нельзя работать с общими данными вне синхронизированного блока.
Снижена вероятность такой ситуации, но всего лишь СНИЖЕНА, а не исключена.
Я вот, например, не боюсь в грозу по улице ходить, хотя https://youtu.be/XXgZSd4fjn8?t=64
Смотри в сторону семафоров, это такой же ядрёный объект, только ещё и количество процессов регулирует (на случай если соседей будет не 2, а 23).
eoln вне форума Ответить с цитированием
Ответ


Купить рекламу на форуме - 42 тыс руб за месяц



Похожие темы
Тема Автор Раздел Ответов Последнее сообщение
Вылетаю после inter'a Carn Общие вопросы C/C++ 4 23.03.2010 23:58
(process.h) Компилятор говорит Process не объявлен Парсифаль Общие вопросы C/C++ 0 15.02.2010 00:27
Wireless Communication Devices fize Помощь студентам 1 05.01.2010 16:09
Триггеры в Inter Base 7.0 Claster БД в Delphi 9 13.05.2009 18:10