Skip to content

Commit 6c3220f

Browse files
committed
module: summary line
work in progress... - separating pin request handling from actual pinning - later patch (?): pick up pins in state PINNING when HA master changes Motivation: Modification: Result: Target: master Request: branch Patch: rb-entry Fixes: #github-issue Requires-notes: no Requires-book: no Acked-by:
1 parent 9f4a464 commit 6c3220f

File tree

12 files changed

+1429
-622
lines changed

12 files changed

+1429
-622
lines changed

modules/dcache/src/main/java/org/dcache/pinmanager/PinManager.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.dcache.pinmanager.model.Pin.State.FAILED_TO_UNPIN;
77
import static org.dcache.pinmanager.model.Pin.State.PINNED;
88
import static org.dcache.pinmanager.model.Pin.State.PINNING;
9+
import static org.dcache.pinmanager.model.Pin.State.READY_TO_PIN;
910
import static org.dcache.pinmanager.model.Pin.State.READY_TO_UNPIN;
1011
import static org.dcache.pinmanager.model.Pin.State.UNPINNING;
1112

@@ -35,6 +36,7 @@ public class PinManager implements CellMessageReceiver, LeaderLatchListener, Cel
3536

3637
private static final Logger LOGGER = LoggerFactory.getLogger(PinManager.class);
3738
private static final long INITIAL_EXPIRATION_DELAY = SECONDS.toMillis(15);
39+
private static final long INITIAL_PIN_DELAY = SECONDS.toMillis(30);
3840
private static final long INITIAL_UNPIN_DELAY = SECONDS.toMillis(30);
3941

4042
private ScheduledExecutorService executor;
@@ -44,7 +46,8 @@ public class PinManager implements CellMessageReceiver, LeaderLatchListener, Cel
4446

4547
private long expirationPeriod;
4648
private TimeUnit expirationPeriodUnit;
47-
// Period in which to reset all pins that failed to be unpinned from state FAILED_TO_UNPIN to READY_TO_UNPIN
49+
50+
/* Period in which to reset all pins that failed to be unpinned from state FAILED_TO_UNPIN to READY_TO_UNPIN */
4851
private Duration resetFailedUnpinsPeriod;
4952
private int maxUnpinsPerRun = -1;
5053

@@ -94,11 +97,32 @@ public PnfsDeleteEntryNotificationMessage messageArrived(
9497
return message;
9598
}
9699

100+
/**
101+
* Resets all pins in state PINNING to READY_TO_PIN, because after a PinManager HA master
102+
* change, these are abandoned and need to be retried by the new HA master.
103+
*/
104+
private void resetTransitivePinningStates() {
105+
try {
106+
// TODO: cancel these pins in PoolManager!
107+
dao.update(dao.where()
108+
.state(PINNING),
109+
dao.set()
110+
.state(READY_TO_PIN)
111+
.pool(null));
112+
} catch (JDOException | DataAccessException e) {
113+
LOGGER.error("Database failure while trying to reset failed PINNING: {}",
114+
e.getMessage());
115+
} catch (RuntimeException e) {
116+
LOGGER.error("Unexpected failure while resetting PINNING pins", e);
117+
}
118+
}
119+
97120
/**
98121
* Resets all pins in state UNPINNING and FAILED_TO_UNPIN to READY_TO_UNPIN.
99122
*/
100-
private void markAllExpiredPinsReadyToUnpin() {
123+
private void resetTransitiveUnpinningStates() {
101124
dao.update(dao.where()
125+
.stateIsNot(READY_TO_PIN)
102126
.stateIsNot(PINNING)
103127
.stateIsNot(PINNED)
104128
.stateIsNot(READY_TO_UNPIN),
@@ -107,18 +131,19 @@ private void markAllExpiredPinsReadyToUnpin() {
107131
}
108132

109133
/**
110-
* This task transitions all pins that have exceeded their lifetime and are in state PINNING or
111-
* PINNED to state READY_TO_UNPIN. It removes the pool, which expires pins on its own and does
112-
* not need to be contacted for regular expiries. As PoolManager is aware of the timeout for
113-
* pins in state PINNING, it should also delete the request on its own if it is still ongoing.
134+
* This task transitions all pins that have exceeded their lifetime and are in state
135+
* READY_TO_PIN, PINNING or PINNED to state READY_TO_UNPIN. It removes the pool field, as the
136+
* pool will remove these sticky bits on its own and does not need to be contacted for regular
137+
* expiries. As PoolManager is aware of the timeout for PINNING requests, PoolManager should
138+
* delete these ongoing request on its own as well.
114139
*/
115140
private class ExpirationTask implements Runnable {
116141

117-
private AtomicInteger count = new AtomicInteger();
142+
private final AtomicInteger count = new AtomicInteger();
118143

119144
@Override
120145
public void run() {
121-
NDC.push("BackgroundExpiration-" + count.incrementAndGet());
146+
NDC.push("PinExpiration-" + count.incrementAndGet());
122147
try {
123148
dao.update(dao.where()
124149
.expirationTimeBefore(new Date())
@@ -145,11 +170,11 @@ public void run() {
145170
*/
146171
private class ResetFailedUnpinsTask implements Runnable {
147172

148-
private AtomicInteger count = new AtomicInteger();
173+
private final AtomicInteger count = new AtomicInteger();
149174

150175
@Override
151176
public void run() {
152-
NDC.push("BackgroundResetFailedUnpins-" + count.incrementAndGet());
177+
NDC.push("UnpinExpiration-" + count.incrementAndGet());
153178
try {
154179
dao.update(dao.where()
155180
.state(FAILED_TO_UNPIN),
@@ -168,30 +193,40 @@ public void run() {
168193
}
169194
}
170195

196+
// create PinProcessor here or inject? dao issue
197+
private PinProcessor pinTask;
171198
private UnpinProcessor unpinTask;
172199
private final ExpirationTask expirationTask = new ExpirationTask();
173200
private final ResetFailedUnpinsTask resetFailedUnpinsTask = new ResetFailedUnpinsTask();
174201

202+
private ScheduledFuture<?> pinFuture;
175203
private ScheduledFuture<?> unpinFuture;
176204
private ScheduledFuture<?> expirationFuture;
177205
private ScheduledFuture<?> resetFailedUnpinsFuture;
178206

179207
public void init() {
180208
// Needs to be assigned after dao has been initialized
209+
pinTask = new PinProcessor(dao, poolStub, poolMonitor, maxUnpinsPerRun);
181210
unpinTask = new UnpinProcessor(dao, poolStub, poolMonitor, maxUnpinsPerRun);
182211
}
183212

184213
@Override
185214
public void isLeader() {
186215
LOGGER.info("Resetting existing intermediate pin states.");
187-
markAllExpiredPinsReadyToUnpin();
216+
resetTransitivePinningStates();
217+
resetTransitiveUnpinningStates();
188218

189219
LOGGER.info("Scheduling Expiration and Unpin tasks.");
190220
expirationFuture = executor.scheduleWithFixedDelay(
191221
new FireAndForgetTask(expirationTask),
192222
INITIAL_EXPIRATION_DELAY,
193223
expirationPeriodUnit.toMillis(expirationPeriod),
194224
MILLISECONDS);
225+
pinFuture = executor.scheduleWithFixedDelay(
226+
new FireAndForgetTask(pinTask),
227+
INITIAL_PIN_DELAY,
228+
expirationPeriodUnit.toMillis(expirationPeriod),
229+
MILLISECONDS);
195230
unpinFuture = executor.scheduleWithFixedDelay(
196231
new FireAndForgetTask(unpinTask),
197232
INITIAL_UNPIN_DELAY,
@@ -208,16 +243,18 @@ public void isLeader() {
208243
public void notLeader() {
209244
LOGGER.info("Cancelling Expiration, ResetFailedUnpins and Unpin tasks.");
210245
expirationFuture.cancel(false);
246+
pinFuture.cancel(true);
211247
unpinFuture.cancel(true);
212248
resetFailedUnpinsFuture.cancel(true);
213249
}
214250

215251
@Override
216252
public void getInfo(PrintWriter pw) {
217-
pw.printf("Expiration and unpin period: %s %s\n", expirationPeriod,
253+
pw.printf("Period for expiration and unpinning: %s %s\n", expirationPeriod,
218254
expirationPeriodUnit);
219-
pw.printf("Reset pins that failed to unpin period: %s\n",
255+
pw.printf("Period for pinning: %s %s\n", expirationPeriod, expirationPeriodUnit);
256+
pw.printf("Max. unpin operations per run: %s\n", maxUnpinsPerRun);
257+
pw.printf("Period for resetting pins that failed to unpin: %s\n",
220258
TimeUtils.describe(resetFailedUnpinsPeriod).orElse("-"));
221-
pw.printf("Max unpin operations per run: %s\n", maxUnpinsPerRun);
222259
}
223260
}

modules/dcache/src/main/java/org/dcache/pinmanager/PinManagerCLI.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,10 @@ public class PinManagerCLI
5454

5555
private PnfsHandler _pnfs;
5656
private PoolMonitor _poolMonitor;
57-
private PinManager _pinManager;
5857
private PinDao _dao;
59-
private PinRequestProcessor _pinProcessor;
60-
private UnpinRequestProcessor _unpinProcessor;
61-
private MovePinRequestProcessor _moveProcessor;
58+
private PinRequestProcessor _pinRequestProcessor;
59+
private UnpinRequestProcessor _unpinRequestProcessor;
60+
private MovePinRequestProcessor _moveRequestProcessor;
6261

6362
@Required
6463
public void setPnfsStub(CellStub stub) {
@@ -70,24 +69,19 @@ public void setPoolMonitor(PoolMonitor poolMonitor) {
7069
_poolMonitor = poolMonitor;
7170
}
7271

73-
@Required
74-
public void setPinManager(PinManager pinManager) {
75-
_pinManager = pinManager;
76-
}
77-
7872
@Required
7973
public void setPinProcessor(PinRequestProcessor processor) {
80-
_pinProcessor = processor;
74+
_pinRequestProcessor = processor;
8175
}
8276

8377
@Required
8478
public void setUnpinProcessor(UnpinRequestProcessor processor) {
85-
_unpinProcessor = processor;
79+
_unpinRequestProcessor = processor;
8680
}
8781

8882
@Required
8983
public void setMoveProcessor(MovePinRequestProcessor processor) {
90-
_moveProcessor = processor;
84+
_moveRequestProcessor = processor;
9185
}
9286

9387
@Required
@@ -103,7 +97,7 @@ public void setDao(PinDao dao) {
10397
PinManagerPinMessage message = new PinManagerPinMessage(FileAttributes.ofPnfsId(pnfsId),
10498
protocolInfo, requestId, lifetime);
10599
message.setReplyWhenStarted(replyWhenStarted);
106-
return _pinProcessor.messageArrived(message);
100+
return _pinRequestProcessor.messageArrived(message);
107101
}
108102

109103
@Command(name = "pin", hint = "pin a file to disk",
@@ -149,7 +143,7 @@ public String call() throws NumberFormatException, CacheException {
149143
if (!pin.equals("*")) {
150144
message.setPinId(Long.parseLong(pin));
151145
}
152-
_unpinProcessor.messageArrived(message);
146+
_unpinRequestProcessor.messageArrived(message);
153147
return "The pin is now scheduled for removal";
154148
}
155149
}
@@ -179,7 +173,7 @@ public String call() throws CacheException, InterruptedException {
179173
_pnfs.getFileAttributes(pnfsId, attributes);
180174
PinManagerExtendPinMessage message =
181175
new PinManagerExtendPinMessage(fileAttributes, pin, millis);
182-
message = _moveProcessor.messageArrived(message);
176+
message = _moveRequestProcessor.messageArrived(message);
183177
if (message.getExpirationTime() == null) {
184178
return String.format("[%d] %s pinned",
185179
message.getPinId(), pnfsId);
@@ -400,7 +394,7 @@ public String call() throws IOException {
400394
try {
401395
PinManagerUnpinMessage message =
402396
new PinManagerUnpinMessage(pnfsId);
403-
_unpinProcessor.messageArrived(message);
397+
_unpinRequestProcessor.messageArrived(message);
404398
} catch (CacheException e) {
405399
out.append(pnfsId).append(": ").append(e.getMessage()).append('\n');
406400
}
@@ -544,7 +538,7 @@ public synchronized void cancel()
544538
PinManagerUnpinMessage message =
545539
new PinManagerUnpinMessage(pnfsId);
546540
message.setRequestId(_requestId);
547-
_unpinProcessor.messageArrived(message);
541+
_unpinRequestProcessor.messageArrived(message);
548542
}
549543
_cancelled = true;
550544
}

modules/dcache/src/main/java/org/dcache/pinmanager/PinManagerPinMessage.java

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
import diskCacheV111.vehicles.Message;
88
import diskCacheV111.vehicles.PoolMgrSelectReadPoolMsg;
99
import diskCacheV111.vehicles.ProtocolInfo;
10-
1110
import java.io.ObjectStreamException;
1211
import java.util.Date;
1312
import java.util.EnumSet;
13+
import java.util.Optional;
14+
import jline.internal.Nullable;
1415
import org.dcache.auth.attributes.Restriction;
1516
import org.dcache.auth.attributes.Restrictions;
1617
import org.dcache.namespace.FileAttribute;
@@ -29,8 +30,8 @@ public class PinManagerPinMessage extends Message {
2930
private String _pool;
3031
private final String _requestId;
3132
private Date _expirationTime;
32-
private boolean _replyWhenStarted;
3333
private boolean _denyStaging;
34+
private boolean _pinningInProgress;
3435

3536
public PinManagerPinMessage(FileAttributes fileAttributes,
3637
ProtocolInfo protocolInfo,
@@ -49,26 +50,6 @@ public PinManagerPinMessage(FileAttributes fileAttributes,
4950
String requestId,
5051
long lifetime) {
5152
this(fileAttributes, protocolInfo, Restrictions.none(), requestId, lifetime);
52-
}
53-
54-
/**
55-
* Choose whether to wait for the pin to be established before returning. If value is true then
56-
* do not wait for file to be staged, but return straight away. Calling getPool, getPinId and
57-
* getExpirationTime will all return null, unless this request is a retry and the original
58-
* request succeeded (have establishing a pin) in which case the details of that pin are
59-
* available through those methods.
60-
* <p>
61-
* If set to false (the default) then the message returns once the pin request has been
62-
* processed and the pin established, or if there was an error.
63-
*
64-
* @param value whether to reply after the pinning task has been started.
65-
*/
66-
public void setReplyWhenStarted(boolean value) {
67-
_replyWhenStarted = value;
68-
}
69-
70-
public boolean isReplyWhenStarted() {
71-
return _replyWhenStarted;
7253
}
7354

7455
public void setDenyStaging(boolean value) {
@@ -107,8 +88,11 @@ public ProtocolInfo getProtocolInfo() {
10788
return _protocolInfo;
10889
}
10990

110-
public Restriction getRestriction() { return _restriction; }
91+
public Restriction getRestriction() {
92+
return _restriction;
93+
}
11194

95+
@Nullable
11296
public String getPool() {
11397
return _pool;
11498
}
@@ -117,14 +101,22 @@ public void setPool(String pool) {
117101
_pool = pool;
118102
}
119103

120-
public long getPinId() {
121-
return _pinId;
104+
public Optional<Long> getPinId() {
105+
return _pinId == 0 ? Optional.empty() : Optional.of(_pinId);
122106
}
123107

124108
public void setPinId(long pinId) {
125109
_pinId = pinId;
126110
}
127111

112+
public void setPinningInProgress(boolean pinningInProgress) {
113+
_pinningInProgress = pinningInProgress;
114+
}
115+
116+
public boolean isPinningInProgress() {
117+
return _pinningInProgress;
118+
}
119+
128120
public void setExpirationTime(Date expirationTime) {
129121
_expirationTime = expirationTime;
130122
}
@@ -134,15 +126,16 @@ public Date getExpirationTime() {
134126
}
135127

136128
public void setPin(Pin pin) {
129+
_pinningInProgress = false;
137130
setPool(pin.getPool());
138131
setPinId(pin.getPinId());
139132
setExpirationTime(pin.getExpirationTime());
140133
}
141134

142135
@Override
143136
public String toString() {
144-
return "PinManagerPinMessage[" + _fileAttributes + "," +
145-
_protocolInfo + "," + _lifetime + "]";
137+
return "PinManagerPinMessage[" + _fileAttributes + "," + _protocolInfo + "," + _lifetime
138+
+ "]";
146139
}
147140

148141
public static EnumSet<FileAttribute> getRequiredAttributes() {
@@ -152,7 +145,9 @@ public static EnumSet<FileAttribute> getRequiredAttributes() {
152145
}
153146

154147
private Object readResolve() throws ObjectStreamException {
155-
if (_restriction == null) { _restriction = Restrictions.none(); }
148+
if (_restriction == null) {
149+
_restriction = Restrictions.none();
150+
}
156151
return this;
157152
}
158153

0 commit comments

Comments
 (0)