Coverage for drivers/lock_queue.py : 85%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) Cloud Software Group, Inc.
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published
5# by the Free Software Foundation; version 2.1 only.
6#
7# This program is distributed in the hope that it will be useful,
8# but WITHOUT ANY WARRANTY; without even the implied warranty of
9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10# GNU Lesser General Public License for more details.
11#
12# You should have received a copy of the GNU Lesser General Public License
13# along with this program; if not, write to the Free Software Foundation, Inc.,
14# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16import os
17import pickle
18import sys
19import time
21import lock
22import util
25DEBUG_LOG = True
27def debug_log(msg):
28 if DEBUG_LOG: 28 ↛ exitline 28 didn't return from function 'debug_log', because the condition on line 28 was never false
29 util.SMlog("LockQueue: " + msg)
31def get_process_start_time(pid):
32 proc_file = f"/proc/{pid}/stat"
33 with open(proc_file, 'r') as f:
34 return f.read().split(')')[-1].split(' ')[20]
36def process_is_valid(pid, start_time):
37 proc_file = f"/proc/{pid}/stat"
39 try:
40 if start_time != get_process_start_time(pid): 40 ↛ 47line 40 didn't jump to line 47, because the condition on line 40 was never false
41 debug_log(f"Process {pid} has incorrect start time real:{get_process_start_time(pid)} vs expected:{start_time}")
42 return False
43 except FileNotFoundError:
44 debug_log(f"Process {pid} is dead")
45 return False
47 return True
49class LockQueue:
50 def __init__(self, name):
51 self.name = name
52 self._queue_lock = lock.Lock(name, f"ql-{name}")
53 self._action_lock = lock.Lock(name, f"al-{name}")
54 # Filename to hold the process queue
55 self._mem = f"/tmp/mem-{name}"
57 def load_queue(self):
58 try:
59 with open(self._mem, "rb") as f:
60 queue = pickle.load(f)
61 debug_log("load_queue {}".format(queue))
62 except EOFError: 62 ↛ 63line 62 didn't jump to line 63, because the exception caught by line 62 didn't happen
63 queue = []
64 except FileNotFoundError:
65 queue = []
66 return queue
68 def save_queue(self, queue):
69 with open(self._mem, "w+b") as f:
70 pickle.dump(queue, f)
71 debug_log("save_queue {}".format(queue))
73 def push_into_process_queue(self):
74 self._queue_lock.acquire()
76 queue = self.load_queue()
77 queue.append((os.getpid(), get_process_start_time(os.getpid())))
78 self.save_queue(queue)
80 self._queue_lock.release()
82 def __enter__(self):
83 # Add ourselves to the process queue.
84 self.push_into_process_queue()
86 # Keep reading the process queue until we are at the front
87 while True:
88 self._queue_lock.acquire()
89 queue = self.load_queue()
90 front_pid, front_start_time = queue.pop(0)
91 debug_log(f"Testing for PID {front_pid}")
92 if front_pid == os.getpid():
93 # We are at the front, it is now our turn to wait on the action lock
94 # and then do our work
95 debug_log(f"{front_pid} taking action lock")
96 self._action_lock.acquire()
97 # When we have the action lock, save the queue (which no longer
98 # includes us) and release the queue lock to let others join.
99 self.save_queue(queue)
100 self._queue_lock.release()
101 break
103 # Getting here means it was not our turn to do stuff
104 # If the process at the front of the queue is not alive then remove it
105 if not process_is_valid(front_pid, front_start_time): 105 ↛ 113line 105 didn't jump to line 113, because the condition on line 105 was never false
106 # front pid has already been popped from queue so just save it
107 debug_log(f"Removing invalid process {front_pid}")
108 self.save_queue(queue)
109 # Release the lock and try again later. Most waiting will be on the queue lock,
110 # waiting for the single Action lock waiter to release it when it has the action
111 # lock. We sleep a short while before our next check to make it easier for new
112 # waiters to join the queue without really wasting our own time.
113 self._queue_lock.release()
114 time.sleep(0.1)
116 debug_log("In manager")
117 return self
119 def __exit__(self, type, value, tbck):
120 self._action_lock.release()