open Unix;; module type S = sig exception Try_again val resolve : string -> host_entry val try_resolve : string -> host_entry end module Make (P : sig val resolve : string -> host_entry val max_active_threads : int end) = struct;; exception Try_again;; let max_active_threads = P.max_active_threads;; type result = Available of host_entry option | Wait of Mutex.t * Condition.t;; module Cache = Map.Make (struct type t = string let compare = String.compare end);; type cache = { mutable map : result ref Cache.t; mutex : Mutex.t; write_enable : Condition.t; mutable readers : int };; let cache = { map = Cache.empty; mutex = Mutex.create(); write_enable = Condition.create (); readers = 0 };; type jobs = (* Queue.t contrain references but they do not change while in queue *) { queue : (string * result ref * Condition.t) Queue.t; lock : Mutex.t; non_empty : Condition.t; mutable active_threads : int };; let jobs = { queue = Queue.create(); lock = Mutex.create(); non_empty = Condition.create(); active_threads = 1 (* XXX -- should be 0 *) };; let compute name = Available (try Some (P.resolve name) with Not_found -> None) let rec worker (name, update, wakeup) = update := compute name; Condition.broadcast wakeup; (* We check for more work *) Mutex.lock jobs.lock; match Queue.length jobs.queue with | 0 -> (* No more work: we die *) jobs.active_threads <- jobs.active_threads - 1; Mutex.unlock jobs.lock; Thread.exit () | _ -> let r = Queue.take jobs.queue in Mutex.unlock jobs.lock; worker r;; let add_request r = Mutex.lock jobs.lock; if Queue.is_empty jobs.queue then Condition.broadcast jobs.non_empty; Queue.add r jobs.queue; Mutex.unlock jobs.lock;; let manager () = while true do Mutex.lock jobs.lock; while Queue.is_empty jobs.queue || jobs.active_threads > max_active_threads do Condition.wait jobs.non_empty jobs.lock done; let request = Queue.take jobs.queue in jobs.active_threads <- jobs.active_threads + 1; Mutex.unlock jobs.lock; ignore (Thread.create worker request); done;; Thread.create manager ();; let lock_cache_for_write() = Mutex.lock cache.mutex; while cache.readers > 0 do Condition.wait cache.write_enable cache.mutex done let unlock_cache_for_write() = Mutex.unlock cache.mutex;; let lock_cache_for_read() = Mutex.lock cache.mutex; cache.readers <- cache.readers + 1; Mutex.unlock cache.mutex let unlock_cache_for_read() = Mutex.lock cache.mutex; cache.readers <- cache.readers - 1; if cache.readers = 0 then Condition.broadcast cache.write_enable; Mutex.unlock cache.mutex let resolve_with blocking name = let r = try (* on essaye d'abord en lecteur *) lock_cache_for_read(); let r = Cache.find name cache.map in unlock_cache_for_read(); r with Not_found -> unlock_cache_for_read(); (* Maintenant on va peut-e^tre devoir modifier la structure *) (* Il faut donc se placer en ecrivain, sinon deux processus pourrait decider de faire le meme ajoute en parallele *) lock_cache_for_write(); try let r = Cache.find name cache.map in unlock_cache_for_write(); r with Not_found -> let lock = Mutex.create() in let wakeup = Condition.create() in let r = ref (Wait (lock, wakeup)) in cache.map <- Cache.add name r cache.map; unlock_cache_for_write(); add_request (name, r, wakeup); r in let return = function Some entry -> entry | None -> raise Not_found in match !r with | Available entry -> return entry | Wait (lock, wakeup) when not blocking -> raise Try_again | Wait (lock, wakeup) -> Mutex.lock lock; Condition.wait wakeup lock; Mutex.unlock lock; match !r with Available entry -> return entry | _ -> assert false;; let try_resolve = resolve_with false let resolve = resolve_with true end;;