Skip to content

Commit 9b4f2c6

Browse files
Matteo Seclìagilelab-tmnd1991
authored andcommitted
[!461] - Remove avro4s
# New features and improvements - Remove avro4s from dependencies - Update test where avro4s was used # Breaking changes none # Migration none # Bug fixes none # How this feature was tested Unit Test - Pipeline # Related issue Closes #577
1 parent 3558378 commit 9b4f2c6

File tree

26 files changed

+836
-339
lines changed

26 files changed

+836
-339
lines changed

consumers-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/eventengine/MailStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ class InnerMailStrategy(config: Config) {
223223
sb.toString
224224
}
225225

226-
println("EXPLODE QUERY: " + sqlQuery)
226+
// println("EXPLODE QUERY: " + sqlQuery)
227227
val explodedMails = ss.sql(sqlQuery)
228228

229229
ss.catalog.dropTempView(mailTableName)

consumers-spark/src/test/scala/it/agilelab/bigdata/wasp/consumers/spark/eventengine/InnerEventStrategySpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class InnerEventStrategySpec extends WordSpec with Matchers with BeforeAndAfter
157157
s"Process a big random sequence" in {
158158
import spark.implicits._
159159
val events: Array[Event] = target.transform(spark.sparkContext.parallelize(randomSeq).toDF).as[Event].collect()
160-
println(events.length)
160+
val _ = events.length
161161
}
162162

163163
s"Find $totalEventsQuantity test events in mixedSeq which match the control event seq" in {
Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,68 @@
11
package it.agilelab.bigdata.wasp.consumers.spark.http.data
22

3-
case class FromKafka(id: String, exampleAuthor: String, timestamp: Long)
3+
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
4+
import org.apache.avro.io.{BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter, DecoderFactory, EncoderFactory}
5+
import org.apache.avro.{Schema, SchemaBuilder}
6+
7+
import java.io.ByteArrayOutputStream
8+
9+
case class FromKafka(id: String, exampleAuthor: String, timestamp: Long)
10+
11+
object FromKafka {
12+
val schema: Schema = SchemaBuilder
13+
.record("FromKafka")
14+
.fields()
15+
.name("id")
16+
.`type`()
17+
.stringType()
18+
.noDefault()
19+
.name("exampleAuthor")
20+
.`type`()
21+
.stringType()
22+
.noDefault()
23+
.name("timestamp")
24+
.`type`()
25+
.longType()
26+
.noDefault()
27+
.endRecord();
28+
29+
def toRecord(obj: FromKafka): GenericRecord = {
30+
val record = new GenericData.Record(schema)
31+
record.put("id", obj.id)
32+
record.put("exampleAuthor", obj.exampleAuthor)
33+
record.put("timestamp", obj.timestamp)
34+
record
35+
}
36+
37+
def fromRecord(record: GenericRecord): FromKafka = {
38+
FromKafka(
39+
record.get("id").toString,
40+
record.get("exampleAuthor").toString,
41+
record.get("timestamp").asInstanceOf[Long]
42+
)
43+
}
44+
45+
def serializeToBytes(obj: FromKafka, schema: Schema): Array[Byte] = {
46+
val byteArrayOutputStream = new ByteArrayOutputStream()
47+
val datumWriter: DatumWriter[GenericRecord] =
48+
new GenericDatumWriter[GenericRecord](schema)
49+
val encoder: BinaryEncoder =
50+
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
51+
datumWriter.write(toRecord(obj), encoder)
52+
encoder.flush()
53+
byteArrayOutputStream.toByteArray // Return the byte array
54+
}
55+
56+
def deserializeFromBytes(
57+
bytes: Array[Byte],
58+
schema: Schema
59+
): FromKafka = {
60+
val byteArrayInputStream = new java.io.ByteArrayInputStream(bytes)
61+
val datumReader: DatumReader[GenericRecord] =
62+
new GenericDatumReader[GenericRecord](schema)
63+
val decoder: BinaryDecoder =
64+
DecoderFactory.get().binaryDecoder(byteArrayInputStream, null)
65+
val record = datumReader.read(null, decoder)
66+
fromRecord(record)
67+
}
68+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,62 @@
11
package it.agilelab.bigdata.wasp.consumers.spark.http.data
22

3+
import org.apache.avro.{Schema, SchemaBuilder}
4+
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
5+
import org.apache.avro.io.{BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter, DecoderFactory, EncoderFactory}
6+
7+
import java.io.ByteArrayOutputStream
8+
39
case class SampleData(id: String, text: String)
10+
11+
object SampleData {
12+
val schema: Schema = SchemaBuilder
13+
.record("SampleData")
14+
.fields()
15+
.name("id")
16+
.`type`()
17+
.stringType()
18+
.noDefault()
19+
.name("text")
20+
.`type`()
21+
.stringType()
22+
.noDefault()
23+
.endRecord();
24+
25+
def toRecord(obj: SampleData): GenericRecord = {
26+
val record = new GenericData.Record(schema)
27+
record.put("id", obj.id)
28+
record.put("text", obj.text)
29+
record
30+
}
31+
32+
def fromRecord(record: GenericRecord): SampleData = {
33+
SampleData(
34+
record.get("id").toString,
35+
record.get("text").toString,
36+
)
37+
}
38+
39+
def serializeToBytes(obj: SampleData, schema: Schema): Array[Byte] = {
40+
val byteArrayOutputStream = new ByteArrayOutputStream()
41+
val datumWriter: DatumWriter[GenericRecord] =
42+
new GenericDatumWriter[GenericRecord](schema)
43+
val encoder: BinaryEncoder =
44+
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
45+
datumWriter.write(toRecord(obj), encoder)
46+
encoder.flush()
47+
byteArrayOutputStream.toByteArray // Return the byte array
48+
}
49+
50+
def deserializeFromBytes(
51+
bytes: Array[Byte],
52+
schema: Schema
53+
): SampleData = {
54+
val byteArrayInputStream = new java.io.ByteArrayInputStream(bytes)
55+
val datumReader: DatumReader[GenericRecord] =
56+
new GenericDatumReader[GenericRecord](schema)
57+
val decoder: BinaryDecoder =
58+
DecoderFactory.get().binaryDecoder(byteArrayInputStream, null)
59+
val record = datumReader.read(null, decoder)
60+
fromRecord(record)
61+
}
62+
}

consumers-spark/src/test/scala/it/agilelab/bigdata/wasp/consumers/spark/http/topic/package.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package it.agilelab.bigdata.wasp.consumers.spark.http
22

3-
import com.sksamuel.avro4s.AvroSchema
43
import it.agilelab.bigdata.wasp.consumers.spark.http.data.{FromKafka, SampleData}
54
import it.agilelab.bigdata.wasp.core.utils.JsonConverter
65
import it.agilelab.bigdata.wasp.models.TopicModel
@@ -17,7 +16,7 @@ package object topic {
1716
valueFieldsNames = None,
1817
useAvroSchemaManager = false,
1918
schema = JsonConverter
20-
.fromString(AvroSchema[FromKafka].toString(false))
19+
.fromString(FromKafka.schema.toString(false))
2120
.getOrElse(org.mongodb.scala.bson.BsonDocument())
2221
)
2322

@@ -32,7 +31,7 @@ package object topic {
3231
valueFieldsNames = None,
3332
useAvroSchemaManager = false,
3433
schema = JsonConverter
35-
.fromString(AvroSchema[SampleData].toString(false))
34+
.fromString(SampleData.schema.toString(false))
3635
.getOrElse(org.mongodb.scala.bson.BsonDocument())
3736
)
3837
}

consumers-spark/src/test/scala/it/agilelab/bigdata/wasp/consumers/spark/strategies/gdpr/ConfigUtilsSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class ConfigUtilsSpec extends FlatSpec with Matchers with TryValues with BeforeA
1616
|{ "$KV_CONF_KEY" { "$KEYS_TO_DELETE_KEY" = [${keys.mkString(",")}], "$CORRELATION_ID_KEY" = "$correlationId" } }
1717
|""".stripMargin
1818

19-
println(stringConfig)
19+
// println(stringConfig)
2020

2121
val rootConfig = ConfigFactory.parseString(stringConfig)
2222
val config = ConfigUtils.getOptionalConfig(rootConfig, KV_CONF_KEY)

consumers-spark/src/test/scala/it/agilelab/bigdata/wasp/consumers/spark/strategies/gdpr/HdfsDataDeletionSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ class HdfsDataDeletionSpec extends FlatSpec with Matchers with TryValues with Be
635635

636636
import spark.implicits._
637637
val fileNames = readFileNameAndId
638-
println(fileNames)
638+
// println(fileNames)
639639

640640
val rawDataStoreConf = RawDataStoreConf(
641641
keyColumn,
@@ -713,7 +713,7 @@ class HdfsDataDeletionSpec extends FlatSpec with Matchers with TryValues with Be
713713

714714
import spark.implicits._
715715
val fileNames = readFileNameAndId
716-
println(fileNames)
716+
// println(fileNames)
717717

718718
val rawDataStoreConf = RawDataStoreConf(
719719
keyColumn,

0 commit comments

Comments
 (0)