From 13e1423c19ea2c10e2873041f91e27ead2191bed Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 6 Jan 2022 11:23:05 +0800 Subject: [PATCH] cherry pick support write back to original tag for jar --- .../src/main/resources/application.conf | 2 ++ .../vesoft/nebula/algorithm/config/Configs.scala | 14 +++++++++++--- .../nebula/algorithm/writer/AlgoWriter.scala | 9 +++++++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index 38e3f0e..9c82e02 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -45,6 +45,8 @@ space:nb # Nebula tag name, the algorithm result will be write into this tag tag:pagerank + # algorithm result is insert into new tag or update to original tag. type: insert/update + type:insert } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala index eed6fb1..ea35556 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -99,8 +99,15 @@ object NebulaConfigEntry { val pswd = nebulaConfig.getString("write.pswd") val writeSpace = nebulaConfig.getString("write.space") val writeTag = nebulaConfig.getString("write.tag") + val writeType = nebulaConfig.getString("write.type") val writeConfigEntry = - NebulaWriteConfigEntry(graphAddress, writeMetaAddress, user, pswd, writeSpace, writeTag) + NebulaWriteConfigEntry(graphAddress, + writeMetaAddress, + user, + pswd, + writeSpace, + writeTag, + writeType) NebulaConfigEntry(readConfigEntry, writeConfigEntry) } } @@ -211,10 +218,11 @@ case class NebulaWriteConfigEntry(graphAddress: String = "", user: String = "", pswd: String = "", space: String = "", - tag: String = "") { + tag: String = "", + writeType: String = "insert") { override def toString: String = { s"NebulaWriteConfigEntry: " + - s"{graphAddress: $graphAddress, user: $user, password: $pswd, space: $space, tag: $tag}" + s"{graphAddress: $graphAddress, user: $user, password: $pswd, space: $space, tag: $tag, type: $writeType}" } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala index 7a93473..5fbd35a 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala @@ -6,7 +6,7 @@ package com.vesoft.nebula.algorithm.writer import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter -import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteNebulaVertexConfig} +import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteMode, WriteNebulaVertexConfig} import com.vesoft.nebula.algorithm.config.{AlgoConstants, Configs} import org.apache.spark.sql.DataFrame @@ -22,6 +22,8 @@ class NebulaWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, c val tag = configs.nebulaConfig.writeConfigEntry.tag val user = configs.nebulaConfig.writeConfigEntry.user val passwd = configs.nebulaConfig.writeConfigEntry.pswd + val writeType = configs.nebulaConfig.writeConfigEntry.writeType + val writeMode = if (writeType.equals("insert")) WriteMode.INSERT else WriteMode.UPDATE val config = NebulaConnectionConfig @@ -30,13 +32,16 @@ class NebulaWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, c .withGraphAddress(graphAddress) .withConenctionRetry(2) .build() - val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig + val nebulaWriteVertexConfig = WriteNebulaVertexConfig .builder() + .withUser(user) + .withPasswd(passwd) .withSpace(space) .withTag(tag) .withVidField(AlgoConstants.ALGO_ID_COL) .withVidAsProp(false) .withBatch(1000) + .withWriteMode(writeMode) .build() data.write.nebula(config, nebulaWriteVertexConfig).writeVertices() }