Skip to content

Commit 192ee24

Browse files
authored
support get edge data through ngql (#64)
1 parent 425176c commit 192ee24

File tree

4 files changed

+68
-4
lines changed

4 files changed

+68
-4
lines changed

nebula-algorithm/src/main/resources/application.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
}
1111

1212
data: {
13-
# data source. optional of nebula,csv,json
13+
# data source. optional of nebula,nebula-ngql,csv,json
1414
source: csv
1515
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
1616
sink: csv

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ object Main {
119119
val reader = new NebulaReader(spark, configs, partitionNum)
120120
reader.read()
121121
}
122+
case "nebula-ngql" => {
123+
val reader = new NebulaReader(spark, configs, partitionNum)
124+
reader.readNgql()
125+
}
122126
case "csv" => {
123127
val reader = new CsvReader(spark, configs, partitionNum)
124128
reader.read()

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,18 @@ object NebulaConfigEntry {
9090
} else {
9191
List()
9292
}
93-
val readConfigEntry =
93+
val readConfigEntry = if (nebulaConfig.hasPath("read.ngql")) {
94+
val readGraphAddress = nebulaConfig.getString("read.graphAddress")
95+
val ngql = nebulaConfig.getString("read.ngql")
96+
NebulaReadConfigEntry(readMetaAddress,
97+
readSpace,
98+
readLabels,
99+
readWeightCols,
100+
readGraphAddress,
101+
ngql)
102+
} else {
94103
NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols)
104+
}
95105

96106
val graphAddress = nebulaConfig.getString("write.graphAddress")
97107
val writeMetaAddress = nebulaConfig.getString("write.metaAddress")
@@ -203,11 +213,13 @@ case class NebulaConfigEntry(readConfigEntry: NebulaReadConfigEntry,
203213
case class NebulaReadConfigEntry(address: String = "",
204214
space: String = "",
205215
labels: List[String] = List(),
206-
weightCols: List[String] = List()) {
216+
weightCols: List[String] = List(),
217+
graphAddress: String = "",
218+
ngql: String = "") {
207219
override def toString: String = {
208220
s"NebulaReadConfigEntry: " +
209221
s"{address: $address, space: $space, labels: ${labels.mkString(",")}, " +
210-
s"weightCols: ${weightCols.mkString(",")}}"
222+
s"weightCols: ${weightCols.mkString(",")}, ngql: $ngql}"
211223
}
212224
}
213225

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,54 @@ class NebulaReader(spark: SparkSession, configs: Configs, partitionNum: String)
6565
}
6666
dataset
6767
}
68+
69+
def readNgql(): DataFrame = {
70+
val metaAddress = configs.nebulaConfig.readConfigEntry.address
71+
val graphAddress = configs.nebulaConfig.readConfigEntry.graphAddress
72+
val space = configs.nebulaConfig.readConfigEntry.space
73+
val labels = configs.nebulaConfig.readConfigEntry.labels
74+
val weights = configs.nebulaConfig.readConfigEntry.weightCols
75+
val partition = partitionNum.toInt
76+
val ngql = configs.nebulaConfig.readConfigEntry.ngql
77+
78+
val config =
79+
NebulaConnectionConfig
80+
.builder()
81+
.withMetaAddress(metaAddress)
82+
.withGraphAddress(graphAddress)
83+
.withConenctionRetry(2)
84+
.build()
85+
86+
var dataset: DataFrame = null
87+
for (i <- labels.indices) {
88+
val returnCols: ListBuffer[String] = new ListBuffer[String]
89+
if (configs.dataSourceSinkEntry.hasWeight && weights.nonEmpty) {
90+
returnCols.append(weights(i))
91+
}
92+
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
93+
.builder()
94+
.withSpace(space)
95+
.withLabel(labels(i))
96+
.withPartitionNum(partition)
97+
.withNgql(ngql)
98+
.build()
99+
if (dataset == null) {
100+
dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
101+
if (weights.nonEmpty) {
102+
dataset = dataset.select("_srcId", "_dstId", weights(i))
103+
}
104+
} else {
105+
var df = spark.read
106+
.nebula(config, nebulaReadEdgeConfig)
107+
.loadEdgesToDF()
108+
if (weights.nonEmpty) {
109+
df = df.select("_srcId", "_dstId", weights(i))
110+
}
111+
dataset = dataset.union(df)
112+
}
113+
}
114+
dataset
115+
}
68116
}
69117

70118
class CsvReader(spark: SparkSession, configs: Configs, partitionNum: String)

0 commit comments

Comments
 (0)