Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) Cloud Software Group, Inc. 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU Lesser General Public License as published 

5# by the Free Software Foundation; version 2.1 only. 

6# 

7# This program is distributed in the hope that it will be useful, 

8# but WITHOUT ANY WARRANTY; without even the implied warranty of 

9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

10# GNU Lesser General Public License for more details. 

11# 

12# You should have received a copy of the GNU Lesser General Public License 

13# along with this program; if not, write to the Free Software Foundation, Inc., 

14# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 

15 

16import os 

17import pickle 

18import sys 

19import time 

20 

21import lock 

22import util 

23 

24 

25DEBUG_LOG = True 

26 

27def debug_log(msg): 

28 if DEBUG_LOG: 28 ↛ exitline 28 didn't return from function 'debug_log', because the condition on line 28 was never false

29 util.SMlog("LockQueue: " + msg) 

30 

31def get_process_start_time(pid): 

32 proc_file = f"/proc/{pid}/stat" 

33 with open(proc_file, 'r') as f: 

34 return f.read().split(')')[-1].split(' ')[20] 

35 

36def process_is_valid(pid, start_time): 

37 proc_file = f"/proc/{pid}/stat" 

38 

39 try: 

40 if start_time != get_process_start_time(pid): 40 ↛ 47line 40 didn't jump to line 47, because the condition on line 40 was never false

41 debug_log(f"Process {pid} has incorrect start time real:{get_process_start_time(pid)} vs expected:{start_time}") 

42 return False 

43 except FileNotFoundError: 

44 debug_log(f"Process {pid} is dead") 

45 return False 

46 

47 return True 

48 

49class LockQueue: 

50 def __init__(self, name): 

51 self.name = name 

52 self._queue_lock = lock.Lock(name, f"ql-{name}") 

53 self._action_lock = lock.Lock(name, f"al-{name}") 

54 # Filename to hold the process queue 

55 self._mem = f"/tmp/mem-{name}" 

56 

57 def load_queue(self): 

58 try: 

59 with open(self._mem, "rb") as f: 

60 queue = pickle.load(f) 

61 debug_log("load_queue {}".format(queue)) 

62 except EOFError: 62 ↛ 63line 62 didn't jump to line 63, because the exception caught by line 62 didn't happen

63 queue = [] 

64 except FileNotFoundError: 

65 queue = [] 

66 return queue 

67 

68 def save_queue(self, queue): 

69 with open(self._mem, "w+b") as f: 

70 pickle.dump(queue, f) 

71 debug_log("save_queue {}".format(queue)) 

72 

73 def push_into_process_queue(self): 

74 self._queue_lock.acquire() 

75 

76 queue = self.load_queue() 

77 queue.append((os.getpid(), get_process_start_time(os.getpid()))) 

78 self.save_queue(queue) 

79 

80 self._queue_lock.release() 

81 

82 def __enter__(self): 

83 # Add ourselves to the process queue. 

84 self.push_into_process_queue() 

85 

86 # Keep reading the process queue until we are at the front 

87 while True: 

88 self._queue_lock.acquire() 

89 queue = self.load_queue() 

90 front_pid, front_start_time = queue.pop(0) 

91 debug_log(f"Testing for PID {front_pid}") 

92 if front_pid == os.getpid(): 

93 # We are at the front, it is now our turn to wait on the action lock 

94 # and then do our work 

95 debug_log(f"{front_pid} taking action lock") 

96 self._action_lock.acquire() 

97 # When we have the action lock, save the queue (which no longer 

98 # includes us) and release the queue lock to let others join. 

99 self.save_queue(queue) 

100 self._queue_lock.release() 

101 break 

102 

103 # Getting here means it was not our turn to do stuff 

104 # If the process at the front of the queue is not alive then remove it 

105 if not process_is_valid(front_pid, front_start_time): 105 ↛ 113line 105 didn't jump to line 113, because the condition on line 105 was never false

106 # front pid has already been popped from queue so just save it 

107 debug_log(f"Removing invalid process {front_pid}") 

108 self.save_queue(queue) 

109 # Release the lock and try again later. Most waiting will be on the queue lock, 

110 # waiting for the single Action lock waiter to release it when it has the action 

111 # lock. We sleep a short while before our next check to make it easier for new 

112 # waiters to join the queue without really wasting our own time. 

113 self._queue_lock.release() 

114 time.sleep(0.1) 

115 

116 debug_log("In manager") 

117 return self 

118 

119 def __exit__(self, type, value, tbck): 

120 self._action_lock.release()