IPC之信号量·即时通讯小程序(三)

发布时间:2020-10-22 08:00:16 作者:SherryX
来源:网络 阅读:2634

上次说到解决并发的问题,需要用到信号量。下面,简单复习一下。

信号量

信号量是一种变量,它只能取正整数值,对这些正整数只能进行两种操作:等待和信号。(在我的理解,信号量就是用来访问一些临界资源而设计的)
用两种记号来表示信号量的这两种操作:
P(semaphore variable) 代表等待(请求资源)
V(semaphore variable) 代表信号(释放资源)

信号量的分类

运行结果:
先运行./producer。 当生产到10个产品之后,就会阻塞。此时运行./consumer,每3s取走一个产品,1s后生产者会生产一个产品。

即时通讯小程序

现在,可以继续我们的小程序了。有了以上两个练习之后,就更容易理解了。跟生产者消费者问题很类似。
我们为读写设置两个信号量,一个控制读,一个控制写。写信号量初始化为1,写完后-1,变成0。读信号量初始化为0,写操作后,变成在线用户数-1。如果所有用户都读完了,读信号量为0,写信号量为1。
public.h:

#ifndef _PUBLIC_H_
#define _PUBLIC_H_

#include < stdio.h>
#include < string.h>
#include < sys/types.h>
#include < sys/ipc.h>
#include < sys/msg.h>
#include < sys/shm.h>
#include < signal.h>
#include < sys/sem.h>
#include < string>
#include < map>
#include < iostream>
using namespace std;

//用户信息结构体
typedef struct user_t
{
    pid_t pid;
    char uname[10]; //后面加上用户名不重名、密码验证
}USER_T;

//登录消息队列结构体
typedef struct login_msg_t
{
    long type;
    USER_T user;
}LMSG_T;

//聊天消息结构体
typedef struct msg_t
{
    USER_T user;
    char acMsg[100];
}MSG_T;

//消息队列:用户登录
#define LOGIN_TYPE          1
#define EXIT_TYPE           2
#define MSG_KEY             1000
#define MSG_SIZE            sizeof(LMSG_T)-sizeof(long)

//共享内存:用户列表(空闲块:0-空闲,1-占用)
#define SHM_USER_KEY        1001
#define MAX_USER            100
#define SHM_USER_SIZE       MAX_USER + MAX_USER * sizeof(USER_T)

//共享内存:聊天内容
#define SHM_MSG_KEY         1002
#define SHM_MSG_SIZE        sizeof(MSG_T)

//信号:更新用户列表,读消息
#define SIGNAL_USERS        34
#define SIGNAL_CHAT         35

//读写信号量
#define SEM_KEY             1003

union semun {
   int              val;    /* Value for SETVAL */
   struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
   unsigned short  *array;  /* Array for GETALL, SETALL */
   struct seminfo  *__buf;  /* Buffer for IPC_INFO
                               (Linux-specific) */
};
//两个信号量,一个控制读,一个控制写
//写信号量初始化为1,写完后-1,变成0
//读信号量初始化为0,写操作后,变成在线用户数-1
//如果所有用户都读完了,读信号量为0,写信号量为1
union semun sem = {0};
struct sembuf buf1 = {0};   //写
struct sembuf buf2 = {0};   //读
#endif

server.cpp:

#include " public.h"

int main()
{
    int msg_id;
    int shm_id;
    LMSG_T loginMsg = {0};
    char *userAddr;
    USER_T *pUser;  //用户真正写入的地址
    map<int,string> userMap;    //用户列表
    map<int,string>::iterator it;
    int i;

    /*1、创建消息队列:用户登录*/
    msg_id = msgget(MSG_KEY,0);
    if(msg_id == -1)
    {
        msg_id = msgget(MSG_KEY,IPC_CREAT);
        if (msg_id == -1)
        {
            perror("server msgget");
            return -1;
        }
    }

    /*2、创建共享内存:用户列表*/
    shm_id = shmget(SHM_USER_KEY,0,0);
    if (shm_id != -1)
    {//已经存在,删除
        shmctl(shm_id,IPC_RMID,NULL);
    }
    shm_id = shmget(SHM_USER_KEY,SHM_USER_SIZE,IPC_CREAT);
    userAddr = (char *)shmat(shm_id,NULL,0);//映射
    pUser = (USER_T *)(userAddr + MAX_USER);
    memset(userAddr,0,SHM_USER_SIZE);//初始化

    /*3、创建共享内存:聊天信息*/
    int shm_msg_id = shmget(SHM_MSG_KEY,0,0);
    if (shm_msg_id != -1)
    {
        shmctl(shm_msg_id,IPC_RMID,NULL);
    }
    shm_msg_id = shmget(SHM_MSG_KEY,SHM_MSG_SIZE,IPC_CREAT);
    char *msgAddr = (char *)shmat(shm_msg_id,NULL,0);
    memset(msgAddr,0,SHM_MSG_SIZE);

    /*4、创建信号量*/
    int sem_id;
    sem_id = semget(SEM_KEY,0,0);
    if (sem_id != -1)
    {
        semctl(sem_id,0,IPC_RMID);
        semctl(sem_id,1,IPC_RMID);
    }
    sem_id = semget(SEM_KEY,2,IPC_CREAT);
    //初始化信号量的值
    sem.val = 1;
    semctl(sem_id,0,SETVAL,sem);    //写
    sem.val = 0;
    semctl(sem_id,1,SETVAL,sem);    //读

    //一直监听,是否有用户上线
    while (1)
    {
        memset(&loginMsg,0,sizeof(LMSG_T));
        msgrcv(msg_id,&loginMsg,MSG_SIZE,0,0);  //任何消息都接收
        switch(loginMsg.type)
        {
        case LOGIN_TYPE:
            //登录
            cout<<"client "<<loginMsg.user.uname<<":"<<loginMsg.user.pid<<" is logining..."<<endl;
            //2.1 将登录信息写入共享内存(先找到空闲块)
            for (i = 0 ; i < MAX_USER ; i++)
            {
                if (*(userAddr + i) == 0)
                {
                    //空闲
                    break;
                }
            }
            if (i < MAX_USER)
            {
                *(userAddr + i) = 1;
                *(pUser + i) = loginMsg.user;
                userMap.insert( pair<int,string>(loginMsg.user.pid,loginMsg.user.uname) );
            }
            else
            {
                cout<<"online users are full.\n"<<endl;
                return 1;
            }
            //2.2 发消息通知所有在线用户
            for (it = userMap.begin();it != userMap.end();it++)
            {
                kill((*it).first,SIGNAL_USERS);
            }

            break;
        case EXIT_TYPE:
            //退出
            cout<<"client "<<loginMsg.user.uname<<":"<<loginMsg.user.pid<<" is exiting..."<<endl;
            for (i = 0 ; i < MAX_USER ; i++)
            {
                if ((pUser+i)->pid == loginMsg.user.pid)
                {
                    *(userAddr+i) = 0;
                    break;
                }
            }
            for (it = userMap.begin();it != userMap.end();it++)
            {
                if ((*it).first == loginMsg.user.pid)
                {
                    continue;   //自己退出,不用再通知自己
                }
                kill((*it).first,SIGNAL_USERS);
            }
            break;
        }

    }

    return 0;
}

client.cpp:

#include " public.h"

char *userAddr;
USER_T *pUser;

char *msgAddr;
MSG_T *pMsg;

map<int,string> userMap;    //用户列表
map<int,string>::iterator it;

int sem_id;

void PrtUserList(int sig_no)
{
    //读取共享内存里的用户列表数据
    userMap.clear();
    cout<<"==== online users ===="<<endl;
    for (int i = 0 ;i < MAX_USER ; i++)
    {
        if(*(userAddr + i) == 1)
        {
            cout<<(pUser + i)->uname<<endl;
            userMap.insert(pair<int,string>( (pUser+i)->pid, (pUser+i)->uname ));
        }
    }
    cout<<"========= "<<userMap.size()<<" ========="<<endl;
}

void GetChatMsg(int sig_no)
{
    //读取共享内存里的聊天内容
    sleep(10);
    MSG_T msg = {0};
    msg = *pMsg;
    cout<<"receive msg from "<<msg.user.uname<<" : "<<msg.acMsg<<endl;

    // 4.3 读完之后:读信号量-1,读到最后一个用户,读信号量为0,写信号量再设为1
    buf2.sem_num = 1;
    buf2.sem_op = -1;
    buf2.sem_flg = SEM_UNDO;
    semop(sem_id,&buf2,1);

    if (semctl(sem_id,1,GETVAL) == 0) //读信号量为0
    {
//      printf("done.\n");
        sem.val = 1;
        semctl(sem_id,0,SETVAL,sem);
    }
}

int main()
{
    char acOrder[20] = "";
    int msg_id;
    LMSG_T loginMsg = {0};
    char uname[10] = "";
    int shm_id;
    char toWho[10] = "";    //聊天对象
    MSG_T msg = {0};    //聊天消息结构体
    char acMsg[100] = "";   //聊天内容

    cout<<"------------onlineChat-------------"<<endl;
    cout<<"username:";
    cin>>uname;

    //2.3 注册消息(放在最前面)
    signal(SIGNAL_USERS,PrtUserList);
    signal(SIGNAL_CHAT,GetChatMsg);

    /*2、打开用户列表共享内存(要比写消息队列早)*/
    shm_id = shmget(SHM_USER_KEY,0,0);
    if (shm_id == -1)
    {
        perror("client userlist shmget");
        return -1;
    }
    userAddr = (char*)shmat(shm_id,NULL,0);
    pUser = (USER_T*)(userAddr + MAX_USER);

    /*3、打开聊天信息共享内存*/
    int shm_msg_id = shmget(SHM_MSG_KEY,0,0);
    if (shm_msg_id == -1)
    {
        perror("client chatmsg shmget");
        return -1;
    }
    msgAddr = (char *)shmat(shm_msg_id,NULL,0);
    pMsg = (MSG_T *)msgAddr;

    /*4、打开信号量*/
    sem_id = semget(SEM_KEY,0,0);
    if (sem_id == -1)
    {
        perror("client semget");
        return -1;
    }

    /*1、打开消息队列*/
    msg_id = msgget(MSG_KEY,0);
    if(msg_id == -1)
    {
        perror("client msgget");
        return -1;
    }
    //登录,写消息队列
    loginMsg.type = LOGIN_TYPE;     //设置登录的消息类型为1
    loginMsg.user.pid = getpid();
    memcpy(loginMsg.user.uname,uname,strlen(uname));
    cout<<loginMsg.user.uname<<" is logining..."<<endl;
    msgsnd(msg_id,&loginMsg,MSG_SIZE,0);

    //等待写
    while(1)
    {
        putchar('#');
        fflush(stdout);
        scanf("%s",acOrder);
        if (strcmp(acOrder,"exit") == 0)    //退出
        {
            cout<<loginMsg.user.uname<<" is exiting..."<<endl;
            loginMsg.type = EXIT_TYPE;      //设置退出的消息类型为2
            msgsnd(msg_id,&loginMsg,MSG_SIZE,0);
            break;
        }
        else if (strcmp(acOrder,"users") == 0)  //查看在线用户列表
        {
            kill(getpid(),SIGNAL_USERS);
        }
        else if (strcmp(acOrder,"chat") == 0)   //进入聊天模式
        {
            cout<<"to who: ";
            cin>>toWho;
            cout<<"say: ";
            memset(acMsg,0,100);
            cin>>acMsg;

            // 4.1 写之前:P(等待/请求)操作,写信号量-1
            buf1.sem_num = 0;
            buf1.sem_op = -1;
            buf1.sem_flg = SEM_UNDO;
            semop(sem_id,&buf1,1);

            // 3.1 把聊天内容写进共享内存
            memcpy(msg.acMsg,acMsg,strlen(acMsg));
            msg.user = loginMsg.user;
            memcpy(msgAddr,&msg,SHM_MSG_SIZE);

            if (strcmp(toWho,"all") == 0)   //群聊
            {

                // 4.2 写之后:设置读信号量为在线用户数-1
                sem.val = userMap.size() - 1;
                semctl(sem_id,1,SETVAL,sem);

                //通知所有人去读
                for (it = userMap.begin();it != userMap.end();it++)
                {
                    if ((*it).first != getpid())
                    {
                        kill((*it).first,SIGNAL_CHAT);
                    }
                }
            }
            else    //私聊
            {
                for (it = userMap.begin();it != userMap.end();it++)
                {
                    if (strcmp((*it).second.c_str() , toWho) == 0)
                    {
                        kill((*it).first,SIGNAL_CHAT);
                        break;
                    }
                }
            }
        }
        memset(acOrder,0,sizeof(acOrder));
    }

    //解除映射
    shmdt(&userAddr);
    shmdt(&msgAddr);

    return 0;
}

运行结果:
运行结果就不演示了,此时,再sleep(),模拟并发的情况,就不会出现上次的问题了,毕竟此时的聊天内容的共享内存已经变成临界资源了,用新号良控制之后,就不会有同时读写的问题了。
未完待续....

推荐阅读:
  1. 「小程序JAVA实战」小程序视图之条件判断(15)
  2. 「小程序JAVA实战」小程序视图之细说列表渲染(14)

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

信号量 生产者 消费者

上一篇:Android沉浸式状态栏设计的实例代码

下一篇:idea 无法debug调试的解决方案

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》