1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2017 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "linked_list.h"
19
20 #include "stats.h"
21
22 #include <mutex>
23
BasicLinkedList(uint16_t vbucketId, EPStats& st)24 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
25 : SequenceList(),
26 readRange(0, 0),
27 staleSize(0),
28 staleMetaDataSize(0),
29 highSeqno(0),
30 highestDedupedSeqno(0),
31 highestPurgedDeletedSeqno(0),
32 numStaleItems(0),
33 numDeletedItems(0),
34 vbid(vbucketId),
35 st(st),
36 pausedPurgePoint(seqList.end()) {
37 }
38
~BasicLinkedList()39 BasicLinkedList::~BasicLinkedList() {
40 /* Delete stale items here, other items are deleted by the hash
41 table */
42 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
43 seqList.remove_and_dispose_if(
44 [&writeGuard](const OrderedStoredValue& v) {
45 return v.isStale(writeGuard);
46 },
47 [this](OrderedStoredValue* v) {
48 this->st.coreLocal.get()->currentSize.fetch_sub(
49 v->metaDataSize());
50 delete v;
51 });
52
53 /* Erase all the list elements (does not destroy elements, just removes
54 them from the list) */
55 seqList.clear();
56 }
57
appendToList(std::lock_guard<std::mutex>& seqLock, std::lock_guard<std::mutex>& writeLock, OrderedStoredValue& v)58 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
59 std::lock_guard<std::mutex>& writeLock,
60 OrderedStoredValue& v) {
61 seqList.push_back(v);
62 }
63
updateListElem( std::lock_guard<std::mutex>& seqLock, std::lock_guard<std::mutex>& writeLock, OrderedStoredValue& v)64 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
65 std::lock_guard<std::mutex>& seqLock,
66 std::lock_guard<std::mutex>& writeLock,
67 OrderedStoredValue& v) {
68 /* Lock that needed for consistent read of SeqRange 'readRange' */
69 std::lock_guard<SpinLock> lh(rangeLock);
70
71 if (readRange.fallsInRange(v.getBySeqno())) {
72 /* Range read is in middle of a point-in-time snapshot, hence we cannot
73 move the element to the end of the list. Return a temp failure */
74 return UpdateStatus::Append;
75 }
76
77 /* Since there is no other reads or writes happenning in this range, we can
78 move the item to the end of the list */
79 auto it = seqList.iterator_to(v);
80 /* If the list is being updated at 'pausedPurgePoint', then we must save
81 the new 'pausedPurgePoint' */
82 if (pausedPurgePoint == it) {
83 pausedPurgePoint = seqList.erase(it);
84 } else {
85 seqList.erase(it);
86 }
87 seqList.push_back(v);
88
89 return UpdateStatus::Success;
90 }
91
92 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
rangeRead(seqno_t start, seqno_t end)93 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
94 if ((start > end) || (start <= 0)) {
95 LOG(EXTENSION_LOG_WARNING,
96 "BasicLinkedList::rangeRead(): "
97 "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
98 vbid,
99 start,
100 end);
101 return std::make_tuple(ENGINE_ERANGE, std::vector<UniqueItemPtr>(), 0);
102 }
103
104 /* Allows only 1 rangeRead for now */
105 std::lock_guard<std::mutex> lckGd(rangeReadLock);
106
107 {
108 std::lock_guard<std::mutex> listWriteLg(getListWriteLock());
109 std::lock_guard<SpinLock> lh(rangeLock);
110 if (start > highSeqno) {
111 LOG(EXTENSION_LOG_WARNING,
112 "BasicLinkedList::rangeRead(): "
113 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
114 vbid,
115 start,
116 static_cast<seqno_t>(highSeqno));
117 /* If the request is for an invalid range, return before iterating
118 through the list */
119 return std::make_tuple(
120 ENGINE_ERANGE, std::vector<UniqueItemPtr>(), 0);
121 }
122
123 /* Mark the initial read range */
124 end = std::min(end, static_cast<seqno_t>(highSeqno));
125 end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
126 readRange = SeqRange(1, end);
127 }
128
129 /* Read items in the range */
130 std::vector<UniqueItemPtr> items;
131
132 for (const auto& osv : seqList) {
133 int64_t currSeqno(osv.getBySeqno());
134
135 if (currSeqno > end || currSeqno < 0) {
136 /* We have read all the items in the requested range, or the osv
137 * does not yet have a valid seqno; either way we are done */
138 break;
139 }
140
141 {
142 std::lock_guard<SpinLock> lh(rangeLock);
143 readRange.setBegin(currSeqno); /* [EPHE TODO]: should we
144 update the min every time ?
145 */
146 }
147
148 if (currSeqno < start) {
149 /* skip this item */
150 continue;
151 }
152
153 /* Check if this OSV has been made stale and has been superseded by a
154 * newer version. If it has, and the replacement is /also/ in the range
155 * we are reading, we should skip this item to avoid duplicates */
156 StoredValue* replacement;
157 {
158 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
159 replacement = osv.getReplacementIfStale(writeGuard);
160 }
161
162 if (replacement &&
163 replacement->toOrderedStoredValue()->getBySeqno() <= end) {
164 continue;
165 }
166
167 try {
168 items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
169 } catch (const std::bad_alloc&) {
170 /* [EPHE TODO]: Do we handle backfill in a better way ?
171 Perhaps do backfilling partially (that is
172 backfill ==> stream; backfill ==> stream ..so on )?
173 */
174 LOG(EXTENSION_LOG_WARNING,
175 "BasicLinkedList::rangeRead(): "
176 "(vb %d) ENOMEM while trying to copy "
177 "item with seqno %" PRIi64 "before streaming it",
178 vbid,
179 currSeqno);
180 return std::make_tuple(
181 ENGINE_ENOMEM, std::vector<UniqueItemPtr>(), 0);
182 }
183 }
184
185 /* Done with range read, reset the range */
186 {
187 std::lock_guard<SpinLock> lh(rangeLock);
188 readRange.reset();
189 }
190
191 /* Return all the range read items */
192 return std::make_tuple(ENGINE_SUCCESS, std::move(items), end);
193 }
194
updateHighSeqno(std::lock_guard<std::mutex>& listWriteLg, const OrderedStoredValue& v)195 void BasicLinkedList::updateHighSeqno(std::lock_guard<std::mutex>& listWriteLg,
196 const OrderedStoredValue& v) {
197 if (v.getBySeqno() < 1) {
198 throw std::invalid_argument("BasicLinkedList::updateHighSeqno(): vb " +
199 std::to_string(vbid) +
200 "; Cannot set the highSeqno to a value " +
201 std::to_string(v.getBySeqno()) +
202 " which is < 1");
203 }
204 highSeqno = v.getBySeqno();
205 }
updateHighestDedupedSeqno( std::lock_guard<std::mutex>& listWriteLg, const OrderedStoredValue& v)206 void BasicLinkedList::updateHighestDedupedSeqno(
207 std::lock_guard<std::mutex>& listWriteLg, const OrderedStoredValue& v) {
208 if (v.getBySeqno() < 1) {
209 throw std::invalid_argument(
210 "BasicLinkedList::updateHighestDedupedSeqno(): vb " +
211 std::to_string(vbid) +
212 "; Cannot set the highestDedupedSeqno to "
213 "a value " +
214 std::to_string(v.getBySeqno()) + " which is < 1");
215 }
216 highestDedupedSeqno = v.getBySeqno();
217 }
218
markItemStale(std::lock_guard<std::mutex>& listWriteLg, StoredValue::UniquePtr ownedSv, StoredValue* newSv)219 void BasicLinkedList::markItemStale(std::lock_guard<std::mutex>& listWriteLg,
220 StoredValue::UniquePtr ownedSv,
221 StoredValue* newSv) {
222 /* Release the StoredValue as BasicLinkedList does not want it to be of
223 owned type */
224 StoredValue* v = ownedSv.release().get();
225
226 /* Update the stats tracking the memory owned by the list */
227 staleSize.fetch_add(v->size());
228 staleMetaDataSize.fetch_add(v->metaDataSize());
229 st.coreLocal.get()->currentSize.fetch_add(v->metaDataSize());
230
231 ++numStaleItems;
232 v->toOrderedStoredValue()->markStale(listWriteLg, newSv);
233 }
234
purgeTombstones(seqno_t purgeUpToSeqno, std::function<bool()> shouldPause)235 size_t BasicLinkedList::purgeTombstones(seqno_t purgeUpToSeqno,
236 std::function<bool()> shouldPause) {
237 // Purge items marked as stale from the seqList.
238 //
239 // Strategy - we try to ensure that this function does not block
240 // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
241 // To achieve this (safely),
242 // we (try to) acquire the rangeReadLock and setup a 'read' range for the
243 // whole of the seqList. This prevents any other readers from iterating
244 // the list (and accessing stale items) while we purge on it; but permits
245 // front-end operations to continue as they:
246 // a) Only read/modify non-stale items (we only change stale items) and
247 // b) Do not change the list membership of anything within the read-range.
248 // However, we do need to be careful about what members of OSVs we access
249 // here - the only OSVs we can safely access are ones marked stale as they
250 // are no longer in the HashTable (and hence subject to HashTable locks).
251 // To check if an item is stale we need to acquire the writeLock
252 // (OSV::stale is guarded by it) for each list item. While this isn't
253 // ideal (that's the same lock needed by front-end operations), we can
254 // release the lock between each element so front-end operations can
255 // have the opportunity to acquire it.
256 //
257 // Attempt to acquire the readRangeLock, to block anyone else concurrently
258 // reading from the list while we remove elements from it.
259 std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
260 if (!rrGuard) {
261 // If we cannot acquire the lock then another thread is
262 // running a range read. Given these are typically long-running,
263 // return without blocking.
264 return 0;
265 }
266
267 // Determine the start and end iterators.
268 OrderedLL::iterator startIt;
269 {
270 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
271 if (seqList.empty()) {
272 // Nothing in sequence list - nothing to purge.
273 return 0;
274 }
275
276 // Determine the start
277 if (pausedPurgePoint != seqList.end()) {
278 // resume
279 startIt = pausedPurgePoint;
280 pausedPurgePoint = seqList.end();
281 } else {
282 startIt = seqList.begin();
283 }
284 if (startIt->getBySeqno() > purgeUpToSeqno) {
285 /* Nothing to purge */
286 return 0;
287 }
288
289 // Update readRange
290 std::lock_guard<SpinLock> rangeGuard(rangeLock);
291 readRange = SeqRange(startIt->getBySeqno(), purgeUpToSeqno);
292 }
293
294 // Iterate across all but the last item in the seqList, looking
295 // for stale items.
296 size_t purgedCount = 0;
297 bool stale;
298 for (auto it = startIt; it != seqList.end();) {
299 if ((it->getBySeqno() > purgeUpToSeqno) ||
300 (it->getBySeqno() <= 0) /* last item with no valid seqno yet */) {
301 break;
302 }
303
304 {
305 // As we move past the items in the list, increment the begin of
306 // 'readRange' to reduce the window of creating stale items during
307 // updates
308 std::lock_guard<SpinLock> rangeGuard(rangeLock);
309 readRange.setBegin(it->getBySeqno());
310 }
311
312 {
313 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
314 stale = it->isStale(writeGuard);
315 }
316 // Only stale items are purged.
317 if (!stale) {
318 ++it;
319 } else {
320 // Checks pass, remove from list and delete.
321 it = purgeListElem(it);
322 ++purgedCount;
323 }
324
325 if (shouldPause()) {
326 pausedPurgePoint = it;
327 break;
328 }
329 }
330
331 // Complete; reset the readRange.
332 {
333 std::lock_guard<SpinLock> lh(rangeLock);
334 readRange.reset();
335 }
336 return purgedCount;
337 }
338
updateNumDeletedItems(bool oldDeleted, bool newDeleted)339 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
340 if (oldDeleted && !newDeleted) {
341 --numDeletedItems;
342 } else if (!oldDeleted && newDeleted) {
343 ++numDeletedItems;
344 }
345 }
346
getNumStaleItems() const347 uint64_t BasicLinkedList::getNumStaleItems() const {
348 return numStaleItems;
349 }
350
getStaleValueBytes() const351 size_t BasicLinkedList::getStaleValueBytes() const {
352 return staleSize;
353 }
354
getStaleMetadataBytes() const355 size_t BasicLinkedList::getStaleMetadataBytes() const {
356 return staleMetaDataSize;
357 }
358
getNumDeletedItems() const359 uint64_t BasicLinkedList::getNumDeletedItems() const {
360 std::lock_guard<std::mutex> lckGd(getListWriteLock());
361 return numDeletedItems;
362 }
363
getNumItems() const364 uint64_t BasicLinkedList::getNumItems() const {
365 std::lock_guard<std::mutex> lckGd(getListWriteLock());
366 return seqList.size();
367 }
368
getHighSeqno() const369 uint64_t BasicLinkedList::getHighSeqno() const {
370 std::lock_guard<std::mutex> lckGd(getListWriteLock());
371 return highSeqno;
372 }
373
getHighestDedupedSeqno() const374 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
375 std::lock_guard<std::mutex> lckGd(getListWriteLock());
376 return highestDedupedSeqno;
377 }
378
getHighestPurgedDeletedSeqno() const379 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
380 return highestPurgedDeletedSeqno;
381 }
382
getRangeReadBegin() const383 uint64_t BasicLinkedList::getRangeReadBegin() const {
384 std::lock_guard<SpinLock> lh(rangeLock);
385 return readRange.getBegin();
386 }
387
getRangeReadEnd() const388 uint64_t BasicLinkedList::getRangeReadEnd() const {
389 std::lock_guard<SpinLock> lh(rangeLock);
390 return readRange.getEnd();
391 }
getListWriteLock() const392 std::mutex& BasicLinkedList::getListWriteLock() const {
393 return writeLock;
394 }
395
makeRangeIterator( bool isBackfill)396 boost::optional<SequenceList::RangeIterator> BasicLinkedList::makeRangeIterator(
397 bool isBackfill) {
398 auto pRangeItr = RangeIteratorLL::create(*this, isBackfill);
399 return pRangeItr ? RangeIterator(std::move(pRangeItr))
400 : boost::optional<SequenceList::RangeIterator>{};
401 }
402
dump() const403 void BasicLinkedList::dump() const {
404 std::cerr << *this << std::endl;
405 }
406
operator <<(std::ostream& os, const BasicLinkedList& ll)407 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
408 os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
409 << " deletedItems:" << ll.numDeletedItems
410 << " staleItems:" << ll.getNumStaleItems()
411 << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
412 << " elements:[" << std::endl;
413 size_t count = 0;
414 for (const auto& val : ll.seqList) {
415 os << " " << val << std::endl;
416 ++count;
417 }
418 os << "] (count:" << count << ")";
419 return os;
420 }
421
purgeListElem(OrderedLL::iterator it)422 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
423 StoredValue::UniquePtr purged(&*it);
424 {
425 std::lock_guard<std::mutex> lckGd(getListWriteLock());
426 it = seqList.erase(it);
427 }
428
429 /* Update the stats tracking the memory owned by the list */
430 staleSize.fetch_sub(purged->size());
431 staleMetaDataSize.fetch_sub(purged->metaDataSize());
432 st.coreLocal.get()->currentSize.fetch_sub(purged->metaDataSize());
433
434 // Similary for the item counts:
435 --numStaleItems;
436 if (purged->isDeleted()) {
437 --numDeletedItems;
438 }
439
440 if (purged->isDeleted()) {
441 highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
442 purged->getBySeqno());
443 }
444 return it;
445 }
446
447 std::unique_ptr<BasicLinkedList::RangeIteratorLL>
create(BasicLinkedList& ll, bool isBackfill)448 BasicLinkedList::RangeIteratorLL::create(BasicLinkedList& ll, bool isBackfill) {
449 /* Note: cannot use std::make_unique because the constructor of
450 RangeIteratorLL is private */
451 std::unique_ptr<BasicLinkedList::RangeIteratorLL> pRangeItr(
452 new BasicLinkedList::RangeIteratorLL(ll, isBackfill));
453 return pRangeItr->tryLater() ? nullptr : std::move(pRangeItr);
454 }
455
RangeIteratorLL(BasicLinkedList& ll, bool isBackfill)456 BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll,
457 bool isBackfill)
458 : list(ll),
459 /* Try to get range read lock, do not block */
460 readLockHolder(list.rangeReadLock, std::try_to_lock),
461 itrRange(0, 0),
462 numRemaining(0),
463 earlySnapShotEndSeqno(0),
464 isBackfill(isBackfill) {
465 if (!readLockHolder) {
466 /* no blocking */
467 return;
468 }
469
470 std::lock_guard<std::mutex> listWriteLg(list.getListWriteLock());
471 std::lock_guard<SpinLock> lh(list.rangeLock);
472 if (list.highSeqno < 1) {
473 /* No need of holding a lock for the snapshot as there are no items;
474 Also iterator range is at default (0, 0) */
475 readLockHolder.unlock();
476 return;
477 }
478
479 /* Iterator to the beginning of linked list */
480 currIt = list.seqList.begin();
481
482 /* Number of items that can be iterated over */
483 numRemaining = list.seqList.size();
484
485 /* The minimum seqno in the iterator that must be read to get a consistent
486 read snapshot */
487 earlySnapShotEndSeqno = list.highestDedupedSeqno;
488
489 /* Mark the snapshot range on linked list. The range that can be read by the
490 iterator is inclusive of the start and the end. */
491 list.readRange =
492 SeqRange(currIt->getBySeqno(), list.seqList.back().getBySeqno());
493
494 /* Keep the range in the iterator obj. We store the range end seqno as one
495 higher than the end seqno that can be read by this iterator.
496 This is because, we must identify the end point of the iterator, and
497 we the read is inclusive of the end points of list.readRange.
498
499 Further, since use the class 'SeqRange' for 'itrRange' we cannot use
500 curr() == end() + 1 to identify the end point because 'SeqRange' does
501 not internally allow curr > end */
502 itrRange = SeqRange(currIt->getBySeqno(),
503 list.seqList.back().getBySeqno() + 1);
504
505 EXTENSION_LOG_LEVEL severity =
506 isBackfill ? EXTENSION_LOG_NOTICE : EXTENSION_LOG_INFO;
507 LOG(severity,
508 "vb:%" PRIu16 " Created range iterator from %" PRIi64 "to %" PRIi64,
509 list.vbid,
510 curr(),
511 end());
512 }
513
~RangeIteratorLL()514 BasicLinkedList::RangeIteratorLL::~RangeIteratorLL() {
515 std::lock_guard<SpinLock> lh(list.rangeLock);
516 if (readLockHolder.owns_lock()) {
517 /* we must reset the list readRange only if the list iterator still owns
518 the read lock on the list */
519 list.readRange.reset();
520 EXTENSION_LOG_LEVEL severity =
521 isBackfill ? EXTENSION_LOG_NOTICE : EXTENSION_LOG_INFO;
522 LOG(severity, "vb:%" PRIu16 " Releasing the range iterator", list.vbid);
523 }
524 /* As readLockHolder goes out of scope here, it will automatically release
525 the snapshot read lock on the linked list */
526 }
527
operator *() const528 OrderedStoredValue& BasicLinkedList::RangeIteratorLL::operator*() const {
529 if (curr() >= end()) {
530 /* We can't read beyond the range end */
531 throw std::out_of_range(
532 "BasicLinkedList::RangeIteratorLL::operator*()"
533 ": Trying to read beyond range end seqno " +
534 std::to_string(end()));
535 }
536 return *currIt;
537 }
538
539 BasicLinkedList::RangeIteratorLL& BasicLinkedList::RangeIteratorLL::
operator ++()540 operator++() {
541 do {
542 incrOperatorHelper();
543 if (curr() == end()) {
544 /* iterator has gone beyond the range, just return */
545 return *this;
546 }
547 } while (itrRangeContainsAnUpdatedVersion());
548 return *this;
549 }
550
incrOperatorHelper()551 void BasicLinkedList::RangeIteratorLL::incrOperatorHelper() {
552 if (curr() >= end()) {
553 throw std::out_of_range(
554 "BasicLinkedList::RangeIteratorLL::operator++()"
555 ": Trying to move the iterator beyond range end"
556 " seqno " +
557 std::to_string(end()));
558 }
559
560 --numRemaining;
561
562 /* Check if the iterator is pointing to the last element. Increment beyond
563 the last element indicates the end of the iteration */
564 if (curr() == itrRange.getEnd() - 1) {
565 std::lock_guard<SpinLock> lh(list.rangeLock);
566 /* We reset the range and release the readRange lock here so that any
567 iterator client that does not delete the iterator obj will not end up
568 holding the list readRange lock forever */
569 list.readRange.reset();
570 EXTENSION_LOG_LEVEL severity =
571 isBackfill ? EXTENSION_LOG_NOTICE : EXTENSION_LOG_INFO;
572 LOG(severity, "vb:%" PRIu16 " Releasing the range iterator", list.vbid);
573 readLockHolder.unlock();
574
575 /* Update the begin to end() so the client can see that the iteration
576 has ended */
577 itrRange.setBegin(end());
578 return;
579 }
580
581 ++currIt;
582 {
583 /* As the iterator moves we reduce the snapshot range being read on the
584 linked list. This helps reduce the stale items in the list during
585 heavy update load from the front end */
586 std::lock_guard<SpinLock> lh(list.rangeLock);
587 list.readRange.setBegin(currIt->getBySeqno());
588 }
589
590 /* Also update the current range stored in the iterator obj */
591 itrRange.setBegin(currIt->getBySeqno());
592 }
593
itrRangeContainsAnUpdatedVersion()594 bool BasicLinkedList::RangeIteratorLL::itrRangeContainsAnUpdatedVersion() {
595 /* Check if this OSV has been made stale and has been superseded by a
596 newer version. If it has, and the replacement is /also/ in the range
597 we are reading, we should skip this item to avoid duplicates */
598 StoredValue* replacement;
599 {
600 /* Writer and tombstone purger hold the 'list.writeLock' when they
601 change the pointer to the replacement OSV, and hence it would not be
602 safe to read the uniquePtr without preventing concurrent changes to
603 it */
604 std::lock_guard<std::mutex> writeGuard(list.getListWriteLock());
605 replacement = (*(*this)).getReplacementIfStale(writeGuard);
606 }
607 return (replacement != nullptr && replacement->getBySeqno() <= back());
608 }
609