初步实现了书上案例第二,三问的要求,对输出结果有部分偏差,没有实现对已完成任务状态的记录,因此已完成任务输出无论如何都是0,明天会在record中加一个字段进行已完成任务状态的记录
(2) 添加一个名为job_centre:statistics()的统计函数,让它报告队列内、进行中和已完
成任务的状态。
(3) 添加监视工人进程的代码。如果某个工人进程挂了,请确保它所执行的任务被返回到等待完成的任务池里。
-module(gen_server_test).-export([start_link/0, add_job/2, work_wanted/0, job_done/1,statistics/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,terminate/2, code_change/3]).
-export([test_job_centre/0]).
-define(SERVER, ?MODULE).%%创建一个记录,记录中包含了一个工作队列,与下一个工作任务编号
-record(state,
{jobs = queue:new(), next_job_number = 1,workers = gb_sets:new()
}).start_link() ->gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
add_job(Fun,WorkerPid) ->gen_server:call(?SERVER, {add_job, Fun,WorkerPid}).
work_wanted() ->gen_server:call(?SERVER, work_wanted).
job_done(JobNumber) ->gen_server:call(?SERVER, {job_done, JobNumber}).%%调用该函数统计让它报告队列内、进行中和已完成任务的状态。
statistics() ->gen_server:call(?SERVER, statistics).
%%---------------------------------------------------------------------------
init([]) ->{ok, #state{}}.handle_call({add_job, Fun,WorkerPid}, _From, State) ->%%向队列中添加元素,队列的任务为执行Fun中的操作NewJobs = queue:in({State#state.next_job_number, Fun}, State#state.jobs),NewWorkers = gb_sets:add(WorkerPid, State#state.workers),% 建立与工人的连接link(WorkerPid),{reply, State#state.next_job_number,State#state{jobs = NewJobs, next_job_number = State#state.next_job_number + 1, workers = NewWorkers}};
handle_call(work_wanted, _From, State) ->IsEmpty = queue:is_empty(State#state.jobs),case IsEmpty oftrue ->{reply, no, State};false ->{{value, {JobNumber, Fun}}, NewJobs} = queue:out(State#state.jobs),{reply, {JobNumber, Fun}, State#state{jobs = NewJobs}}end;
handle_call({job_done, _}, _From, State) ->{reply, ok, State};
handle_call(statistics, _From, State) ->{reply, #{waiting => queue:len(State#state.jobs), in_progress => gb_sets:size(State#state.workers), completed => 0}, State};
handle_call(Request, _From, State) ->{reply, {error, {unknown_request, Request}}, State}.handle_cast(_Msg, State) ->{noreply, State}.
handle_info({exit, WorkerPid, _Reason}, State) ->%% 如果工人进程崩溃,则从workers集合中移除,并将任务重新添加到队列NewWorkers = gb_sets:delete(WorkerPid, State#state.workers),%% 查找并重新添加任务{ok, Fun} = find_task_for_worker(WorkerPid, State#state.jobs),NewJobs = queue:in({State#state.next_job_number - 1, Fun}, State#state.jobs),{noreply, State#state{workers = NewWorkers, jobs = NewJobs}};
handle_info(_Info, State) ->{noreply, State}.terminate(_Reason, _State) ->ok.code_change(_OldVsn, State, _Extra) ->{ok, State}.find_task_for_worker(WorkerPid, Jobs) ->case gb_sets:is_member(WorkerPid, gb_sets:from_list(Jobs)) oftrue ->%% 找到对应的任务{value, {_, Fun}} = queue:out(Jobs),{ok, Fun};false ->{error, not_found}end.
test_job_centre() ->%%创建两个任务Job1 = fun() -> io:format("Doing job 1~n") end,Job2 = fun() -> io:format("Doing job 2~n") end,%%将两个任务放入任务队列中JobNumber1 = add_job(Job1,self()),io:format("Job number 1 is ~p~n", [JobNumber1]),JobNumber2 = add_job(Job2,self()),io:format("Job number 2 is ~p~n", [JobNumber2]),%%工人领取任务队列任务{JobNumber1, Job1} = work_wanted(),io:format("Doing job ~p~n", [{JobNumber1,Job1}]),{JobNumber2, Job2} = work_wanted(),io:format("Doing job ~p~n", [{JobNumber2,Job2}]),%%表示任务完成job_done(JobNumber1),job_done(JobNumber2),case work_wanted() ofno ->io:format("No more jobs~n");{JobNumber, Job} ->io:format("Doing job ~p~n", [{JobNumber,Job}])end,%%模拟一个工人进程崩溃退出exit(normal),ok.
程序输出结果
不知道为什么任务队列中设置的io没有输出,只输出了调用函数中的io,同时正在执行的进程数量为1