Skip to content

Commit d35355a

Browse files
author
Jacob Maes
committed
SAMZA-1480: TaskStorageManager improperly initializes changelog consu…
…mer position when restoring a store from disk Author: Jacob Maes <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes apache#350 from jmakes/samza-1480
1 parent 4269d62 commit d35355a

File tree

3 files changed

+234
-46
lines changed

3 files changed

+234
-46
lines changed

samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class TaskStorageManager(
6868
}
6969

7070
var changeLogOldestOffsets: Map[SystemStream, String] = Map()
71-
val fileOffset: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
71+
val fileOffsets: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
7272
val offsetFileName = "OFFSET"
7373

7474
def apply(storageEngineName: String) = taskStores(storageEngineName)
@@ -104,7 +104,9 @@ class TaskStorageManager(
104104
} else {
105105
val offset = readOffsetFile(loggedStorePartitionDir)
106106
info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir))
107-
fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
107+
if (offset != null) {
108+
fileOffsets.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
109+
}
108110
}
109111
})
110112
}
@@ -210,7 +212,7 @@ class TaskStorageManager(
210212
for ((storeName, systemStream) <- changeLogSystemStreams) {
211213
val systemAdmin = systemAdmins
212214
.getOrElse(systemStream.getSystem,
213-
throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
215+
throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
214216
val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
215217

216218
systemAdmin.validateStream(changelogSpec)
@@ -228,12 +230,11 @@ class TaskStorageManager(
228230

229231
for ((storeName, systemStream) <- changeLogSystemStreams) {
230232
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
233+
val admin = systemAdmins.getOrElse(systemStream.getSystem,
234+
throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
231235
val consumer = storeConsumers(storeName)
232-
val offset =
233-
Option(fileOffset.get(systemStreamPartition))
234-
.getOrElse(changeLogOldestOffsets
235-
.getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition)))
236236

237+
val offset = getStartingOffset(systemStreamPartition, admin)
237238
if (offset != null) {
238239
info("Registering change log consumer with offset %s for %s." format (offset, systemStreamPartition))
239240
consumer.register(systemStreamPartition, offset)
@@ -246,6 +247,43 @@ class TaskStorageManager(
246247
storeConsumers.values.foreach(_.start)
247248
}
248249

250+
/**
251+
* Returns the offset with which the changelog consumer should be initialized for the given SystemStreamPartition.
252+
*
253+
* If a file offset exists, it represents the last changelog offset which is also reflected in the on-disk state.
254+
* In that case, we use the next offset after the file offset, as long as it is newer than the oldest offset
255+
* currently available in the stream.
256+
*
257+
* If there isn't a file offset or it's older than the oldest available offset, we simply start with the oldest.
258+
*
259+
* @param systemStreamPartition the changelog partition for which the offset is needed.
260+
* @param admin the [[SystemAdmin]] for the changelog.
261+
* @return the offset to from which the changelog consumer should be initialized.
262+
*/
263+
private def getStartingOffset(systemStreamPartition: SystemStreamPartition, admin: SystemAdmin) = {
264+
val fileOffset = fileOffsets.get(systemStreamPartition)
265+
val oldestOffset = changeLogOldestOffsets
266+
.getOrElse(systemStreamPartition.getSystemStream,
267+
throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition))
268+
269+
if (fileOffset != null) {
270+
// File offset was the last message written to the changelog that is also reflected in the store,
271+
// so we start with the NEXT offset
272+
val resumeOffset = admin.getOffsetsAfter(Map(systemStreamPartition -> fileOffset).asJava).get(systemStreamPartition)
273+
if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) {
274+
resumeOffset
275+
} else {
276+
// If the offset we plan to use is older than the oldest offset, just use the oldest offset.
277+
// This can happen with changelogs configured with a TTL cleanup policy
278+
warn(s"Local store offset $resumeOffset is lower than the oldest offset $oldestOffset of the changelog. " +
279+
s"The values between these offsets cannot be restored.")
280+
oldestOffset
281+
}
282+
} else {
283+
oldestOffset
284+
}
285+
}
286+
249287
private def restoreStores() {
250288
debug("Restoring stores.")
251289

@@ -298,7 +336,7 @@ class TaskStorageManager(
298336
for ((storeName, systemStream) <- changeLogSystemStreams.filterKeys(storeName => persistedStores.contains(storeName))) {
299337
val systemAdmin = systemAdmins
300338
.getOrElse(systemStream.getSystem,
301-
throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
339+
throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
302340

303341
debug("Fetching newest offset for store %s" format(storeName))
304342
try {

samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala

Lines changed: 172 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,16 @@
2020
package org.apache.samza.storage
2121

2222

23-
import java.io.File
23+
import java.io.{File, FileOutputStream, ObjectOutputStream}
2424
import java.util
2525

2626
import org.apache.samza.Partition
27-
import org.apache.samza.config.MapConfig
28-
import org.apache.samza.config.StorageConfig
27+
import org.apache.samza.config.{MapConfig, StorageConfig}
2928
import org.apache.samza.container.TaskName
3029
import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
3130
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
3231
import org.apache.samza.system._
33-
import org.apache.samza.util.SystemClock
34-
import org.apache.samza.util.Util
32+
import org.apache.samza.util.{SystemClock, Util}
3533
import org.junit.Assert._
3634
import org.junit.{After, Before, Test}
3735
import org.mockito.Matchers._
@@ -77,19 +75,7 @@ class TestTaskStorageManager extends MockitoSugar {
7775
val storeFile = new File(storeDirectory, "store.sst")
7876
val offsetFile = new File(storeDirectory, "OFFSET")
7977

80-
// getStoreProperties should always return the same StoreProperties
81-
val mockStorageEngine = mock[StorageEngine]
82-
when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
83-
override def answer(invocation: InvocationOnMock): StoreProperties = {
84-
new StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(true).build()
85-
}
86-
})
87-
// Restore simply creates the file
88-
when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] {
89-
override def answer(invocation: InvocationOnMock): Unit = {
90-
storeFile.createNewFile()
91-
}
92-
})
78+
val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
9379

9480
// Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
9581
val mockStreamMetadataCache = mock[StreamMetadataCache]
@@ -192,15 +178,7 @@ class TestTaskStorageManager extends MockitoSugar {
192178
val ssp = new SystemStreamPartition(ss, partition)
193179
val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName)
194180

195-
// getStoreProperties should always return the same StoreProperties
196-
val mockStorageEngine = mock[StorageEngine]
197-
when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
198-
override def answer(invocation: InvocationOnMock): StoreProperties = {
199-
new StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(false).build()
200-
}
201-
})
202-
// Restore simply creates the file
203-
doNothing().when(mockStorageEngine).restore(any())
181+
val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null)
204182

205183
// Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
206184
val mockStreamMetadataCache = mock[StreamMetadataCache]
@@ -308,7 +286,7 @@ class TestTaskStorageManager extends MockitoSugar {
308286
cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
309287

310288
assertTrue("Offset file was removed. Clean up failed!", offsetFilePath.exists())
311-
assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffset.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
289+
assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffsets.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
312290
}
313291

314292
@Test
@@ -510,6 +488,167 @@ class TestTaskStorageManager extends MockitoSugar {
510488
//Check conditions
511489
assertTrue("Offset file should not exist!", !offsetFilePath.exists())
512490
}
491+
492+
@Test
493+
def testCleanBaseDirsShouldNotAddNullOffsetsToFileOffsetsMap(): Unit = {
494+
// If a null file offset were allowed, and the full Map passed to SystemAdmin.getOffsetsAfter an NPE could
495+
// occur for some SystemAdmin implementations
496+
val writeOffsetFile = true
497+
val fileOffset = null
498+
val oldestOffset = "3"
499+
val newestOffset = "150"
500+
val upcomingOffset = "151"
501+
val expectedRegisteredOffset = "3"
502+
503+
testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
504+
}
505+
506+
@Test
507+
def testStartConsumersShouldRegisterCorrectOffsetWhenFileOffsetValid(): Unit = {
508+
// We should register the offset AFTER the stored file offset.
509+
// The file offset represents the last changelog message that is also reflected in the store. So start with next one.
510+
val writeOffsetFile = true
511+
val fileOffset = "139"
512+
val oldestOffset = "3"
513+
val newestOffset = "150"
514+
val upcomingOffset = "151"
515+
val expectedRegisteredOffset = "140"
516+
517+
testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
518+
}
519+
520+
@Test
521+
def testStartConsumersShouldRegisterCorrectOffsetWhenFileOffsetOlderThanOldestOffset(): Unit = {
522+
// We should register the oldest offset if it is less than the file offset
523+
val writeOffsetFile = true
524+
val fileOffset = "139"
525+
val oldestOffset = "145"
526+
val newestOffset = "150"
527+
val upcomingOffset = "151"
528+
val expectedRegisteredOffset = "145"
529+
530+
testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
531+
}
532+
533+
@Test
534+
def testStartConsumersShouldRegisterCorrectOffsetWhenOldestOffsetGreaterThanZero(): Unit = {
535+
val writeOffsetFile = false
536+
val fileOffset = null
537+
val oldestOffset = "3"
538+
val newestOffset = "150"
539+
val upcomingOffset = "151"
540+
val expectedRegisteredOffset = "3"
541+
542+
testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
543+
}
544+
545+
private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = {
546+
val systemName = "kafka"
547+
val streamName = "testStream"
548+
val partitionCount = 1
549+
// Basic test setup of SystemStream, SystemStreamPartition for this task
550+
val ss = new SystemStream(systemName, streamName)
551+
val partition = new Partition(0)
552+
val ssp = new SystemStreamPartition(ss, partition)
553+
val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
554+
val storeFile = new File(storeDirectory, "store.sst")
555+
556+
if (writeOffsetFile) {
557+
val offsetFile = new File(storeDirectory, "OFFSET")
558+
if (fileOffset != null) {
559+
Util.writeDataToFile(offsetFile, fileOffset)
560+
} else {
561+
// Write garbage to produce a null result when it's read
562+
val fos = new FileOutputStream(offsetFile)
563+
val oos = new ObjectOutputStream(fos)
564+
oos.writeLong(1)
565+
oos.writeUTF("Bad Offset")
566+
oos.close()
567+
fos.close()
568+
}
569+
}
570+
571+
val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
572+
573+
// Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
574+
val mockStreamMetadataCache = mock[StreamMetadataCache]
575+
576+
val mockSystemAdmin = mock[SystemAdmin]
577+
val changelogSpec = StreamSpec.createChangeLogStreamSpec(streamName, systemName, partitionCount)
578+
doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
579+
when(mockSystemAdmin.getOffsetsAfter(any())).thenAnswer(new Answer[util.Map[SystemStreamPartition, String]] {
580+
override def answer(invocation: InvocationOnMock): util.Map[SystemStreamPartition, String] = {
581+
val originalOffsets = invocation.getArgumentAt(0, classOf[util.Map[SystemStreamPartition, String]])
582+
originalOffsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
583+
}
584+
})
585+
when(mockSystemAdmin.offsetComparator(any(), any())).thenAnswer(new Answer[Integer] {
586+
override def answer(invocation: InvocationOnMock): Integer = {
587+
val offset1 = invocation.getArgumentAt(0, classOf[String])
588+
val offset2 = invocation.getArgumentAt(1, classOf[String])
589+
offset1.toLong compare offset2.toLong
590+
}
591+
})
592+
593+
val mockSystemConsumer = mock[SystemConsumer]
594+
when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] {
595+
override def answer(invocation: InvocationOnMock): Unit = {
596+
val args = invocation.getArguments
597+
if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) {
598+
val offset = args.apply(1).asInstanceOf[String]
599+
assertNotNull(offset)
600+
assertEquals(expectedRegisteredOffset, offset)
601+
}
602+
}
603+
})
604+
doNothing().when(mockSystemConsumer).stop()
605+
606+
// Test 1: Initial invocation - No store on disk (only changelog has data)
607+
// Setup initial sspMetadata
608+
val sspMetadata = new SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset)
609+
var metadata = new SystemStreamMetadata(streamName, new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
610+
{
611+
put(partition, sspMetadata)
612+
}
613+
})
614+
when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata))
615+
when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new util.HashMap[String, SystemStreamMetadata]() {
616+
{
617+
put(streamName, metadata)
618+
}
619+
})
620+
621+
val taskManager = new TaskStorageManagerBuilder()
622+
.addStore(loggedStore, mockStorageEngine, mockSystemConsumer)
623+
.setStreamMetadataCache(mockStreamMetadataCache)
624+
.setSystemAdmin(systemName, mockSystemAdmin)
625+
.build
626+
627+
taskManager.init
628+
629+
verify(mockSystemConsumer).register(any(classOf[SystemStreamPartition]), anyString())
630+
}
631+
632+
private def createMockStorageEngine(isLoggedStore: Boolean, isPersistedStore: Boolean, storeFile: File) = {
633+
val mockStorageEngine = mock[StorageEngine]
634+
// getStoreProperties should always return the same StoreProperties
635+
when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
636+
override def answer(invocation: InvocationOnMock): StoreProperties = {
637+
new StorePropertiesBuilder().setLoggedStore(isLoggedStore).setPersistedToDisk(isPersistedStore).build()
638+
}
639+
})
640+
// Restore simply creates the file
641+
if (storeFile != null) {
642+
when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] {
643+
override def answer(invocation: InvocationOnMock): Unit = {
644+
storeFile.createNewFile()
645+
}
646+
})
647+
} else {
648+
doNothing().when(mockStorageEngine).restore(any())
649+
}
650+
mockStorageEngine
651+
}
513652
}
514653

515654
object TaskStorageManagerBuilder {
@@ -536,16 +675,11 @@ class TaskStorageManagerBuilder extends MockitoSugar {
536675
this
537676
}
538677

539-
def addStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = {
540-
taskStores = taskStores ++ {
541-
val mockStorageEngine = mock[StorageEngine]
542-
when(mockStorageEngine.getStoreProperties)
543-
.thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build())
544-
Map(storeName -> mockStorageEngine)
545-
}
546-
storeConsumers = storeConsumers ++ Map(storeName -> mock[SystemConsumer])
547-
changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", "testStream"))
548-
this
678+
def addStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = {
679+
val mockStorageEngine = mock[StorageEngine]
680+
when(mockStorageEngine.getStoreProperties)
681+
.thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build())
682+
addStore(storeName, mockStorageEngine, mock[SystemConsumer])
549683
}
550684

551685
def setPartition(p: Partition) = {

samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,23 @@ class TestUtil {
6464

6565
// Check data returned
6666
assertEquals(data, result)
67+
}
6768

69+
@Test
70+
def testReadInvalidDataFromFile() {
71+
// Write garbage to produce a null result when it's read
72+
val fos = new FileOutputStream(file)
73+
val oos = new ObjectOutputStream(fos)
74+
oos.writeLong(1)
75+
oos.writeUTF("Junk Data")
76+
oos.close()
77+
fos.close()
78+
79+
// Invoke test
80+
val result = Util.readDataFromFile(file)
81+
82+
// Check data returned
83+
assertNull(result)
6884
}
6985

7086
@Test

0 commit comments

Comments
 (0)