From cc3b92be8c1c38402fd27cfc2f4135cfdc14a00f Mon Sep 17 00:00:00 2001 From: yshah26 <126716161+yshah26@users.noreply.github.com> Date: Tue, 4 Apr 2023 13:10:40 -0400 Subject: [PATCH] Modified timeouts --- .../ONA.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/Unlock organizational network insights using Microsoft 365 datasets/ONA.json b/templates/Unlock organizational network insights using Microsoft 365 datasets/ONA.json index 4d2e7762..4036660c 100644 --- a/templates/Unlock organizational network insights using Microsoft 365 datasets/ONA.json +++ b/templates/Unlock organizational network insights using Microsoft 365 datasets/ONA.json @@ -1 +1 @@ -{"$schema":"http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#","contentVersion":"1.0.0.0","parameters":{"workspaceName":{"type":"string","metadata":"Workspace name","defaultValue":"ona-synapse"},"Microsoft365":{"type":"string"},"MGDCSink":{"type":"string"}},"variables":{"workspaceId":"[concat('Microsoft.Synapse/workspaces/', parameters('workspaceName'))]"},"resources":[{"name":"[concat(parameters('workspaceName'), '/ONA')]","type":"Microsoft.Synapse/workspaces/pipelines","apiVersion":"2019-06-01-preview","properties":{"activities":[{"name":"If GetUserData is true","type":"IfCondition","dependsOn":[{"activity":"Set User FolderName","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.GetUserData","type":"Expression"},"ifTrueActivities":[{"name":"Copy User","type":"Copy","dependsOn":[],"policy":{"timeout":"0.12:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"Office365Source","outputColumns":[{"name":"country"},{"name":"department"},{"name":"id"},{"name":"jobTitle"},{"name":"mail"},{"name":"preferredLanguage"},{"name":"state"}]},"sink":{"type":"BinarySink","storeSettings":{"type":"AzureBlobFSWriteSettings"}},"enableStaging":false},"inputs":[{"referenceName":"Microsoft365UserTable","type":"DatasetReference","parameters":{}}],"outputs":[{"referenceName":"UserSink","type":"DatasetReference","parameters":{"FolderName":{"value":"@variables('UserFolderName')","type":"Expression"}}}]}]}},{"name":"If GetEmailData is true","type":"IfCondition","dependsOn":[{"activity":"Set Email FolderName","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.GetEmailData","type":"Expression"},"ifTrueActivities":[{"name":"Copy Email","type":"Copy","dependsOn":[],"policy":{"timeout":"0.12:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"Office365Source","dateFilterColumn":"sentDateTime","startTime":{"value":"@formatDateTime(pipeline().parameters.StartDate)","type":"Expression"},"endTime":{"value":"@formatDateTime(pipeline().parameters.EndDate)","type":"Expression"},"outputColumns":[{"name":"receivedDateTime"},{"name":"sentDateTime"},{"name":"sender"},{"name":"from"},{"name":"toRecipients"},{"name":"ccRecipients"},{"name":"bccRecipients"},{"name":"conversationId"},{"name":"isRead"},{"name":"createdDateTime"},{"name":"id"},{"name":"conversationIndex"}]},"sink":{"type":"BinarySink","storeSettings":{"type":"AzureBlobFSWriteSettings"}},"enableStaging":false},"inputs":[{"referenceName":"Microsoft365EmailTable","type":"DatasetReference","parameters":{}}],"outputs":[{"referenceName":"EmailSink","type":"DatasetReference","parameters":{"FolderName":{"value":"@variables('EmailFolderName')","type":"Expression"}}}]}]}},{"name":"If GetTeamsChatData is true","type":"IfCondition","dependsOn":[{"activity":"Set TeamsChat FolderName","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.GetTeamsChatData","type":"Expression"},"ifTrueActivities":[{"name":"Copy TeamsChat","type":"Copy","dependsOn":[],"policy":{"timeout":"0.12:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"Office365Source","dateFilterColumn":"SentDateTime","startTime":{"value":"@formatDateTime(pipeline().parameters.StartDate)","type":"Expression"},"endTime":{"value":"@formatDateTime(pipeline().parameters.EndDate)","type":"Expression"},"outputColumns":[{"name":"Id"},{"name":"ReceivedDateTime"},{"name":"SentDateTime"},{"name":"InternetMessageId"},{"name":"ConversationId"},{"name":"ConversationIndex"},{"name":"Sender"},{"name":"From"},{"name":"ToRecipients"}]},"sink":{"type":"BinarySink","storeSettings":{"type":"AzureBlobFSWriteSettings"}},"enableStaging":false},"inputs":[{"referenceName":"Microsoft365TeamsChatTable","type":"DatasetReference","parameters":{}}],"outputs":[{"referenceName":"TeamsChatSink","type":"DatasetReference","parameters":{"FolderName":{"value":"@variables('TeamsChatFolderName')","type":"Expression"}}}]}]}},{"name":"If GetCalendarData is true","type":"IfCondition","dependsOn":[{"activity":"Set Calendar FileName","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.GetCalendarData","type":"Expression"},"ifTrueActivities":[{"name":"Copy Calendar","type":"Copy","dependsOn":[],"policy":{"timeout":"0.12:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"Office365Source","startTime":{"value":"@formatDateTime(pipeline().parameters.StartDate)","type":"Expression"},"endTime":{"value":"@formatDateTime(pipeline().parameters.EndDate)","type":"Expression"},"outputColumns":[{"name":"id"},{"name":"createdDateTime"},{"name":"iCalUId"},{"name":"start"},{"name":"end"},{"name":"isAllDay"},{"name":"isCancelled"},{"name":"attendees"},{"name":"organizer"}]},"sink":{"type":"BinarySink","storeSettings":{"type":"AzureBlobFSWriteSettings"}},"enableStaging":false},"inputs":[{"referenceName":"Microsoft365CalendarTable","type":"DatasetReference","parameters":{}}],"outputs":[{"referenceName":"CalendarSink","type":"DatasetReference","parameters":{"FolderName":{"value":"@variables('CalendarFolderName')","type":"Expression"}}}]}]}},{"name":"Run ONA Notebook","type":"SynapseNotebook","dependsOn":[{"activity":"Convert StorageUrl to Abfss Path","dependencyConditions":["Succeeded"]}],"policy":{"timeout":"0.12:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"notebook":{"referenceName":"ONA","type":"NotebookReference"},"parameters":{"calendarPath":{"value":{"value":"@concat(variables('StorageAbfssPath'), variables('CalendarFolderName'))","type":"Expression"},"type":"string"},"emailPath":{"value":{"value":"@concat(variables('StorageAbfssPath'), variables('EmailFolderName'))\n","type":"Expression"},"type":"string"},"teamsChatPath":{"value":{"value":"@concat(variables('StorageAbfssPath'), variables('TeamsChatFolderName'))","type":"Expression"},"type":"string"},"userPath":{"value":{"value":"@concat(variables('StorageAbfssPath'), variables('UserFolderName'))","type":"Expression"},"type":"string"},"outputFormat":{"value":"csv","type":"string"},"usersOutputPath":{"value":{"value":"@concat(replace(variables('StorageAbfssPath'),'mgdc@','output@'),'users_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate, '.csv')","type":"Expression"},"type":"string"},"interactionsOutputPath":{"value":{"value":"@concat(replace(variables('StorageAbfssPath'),'mgdc@','output@'),'interactions_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate, '.csv')","type":"Expression"},"type":"string"},"period":{"value":{"value":"@concat(pipeline().parameters.StartDate, ' to ', pipeline().parameters.EndDate)","type":"Expression"},"type":"string"},"obfuscateEmails":{"value":{"value":"@bool(1)","type":"Expression"},"type":"bool"},"leidenMaxClusterSize":{"value":"1000","type":"int"}},"snapshot":true,"sparkPool":{"referenceName":"onasynapsepool","type":"BigDataPoolReference"},"executorSize":null,"conf":{"spark.dynamicAllocation.enabled":null,"spark.dynamicAllocation.minExecutors":null,"spark.dynamicAllocation.maxExecutors":null},"driverSize":null,"numExecutors":null}},{"name":"Set User FolderName","type":"SetVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"UserFolderName","value":{"value":"@concat('user_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate)","type":"Expression"}}},{"name":"Set Email FolderName","type":"SetVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"EmailFolderName","value":{"value":"@concat('email_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate)","type":"Expression"}}},{"name":"Set TeamsChat FolderName","type":"SetVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"TeamsChatFolderName","value":{"value":"@concat('teamschat_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate)","type":"Expression"}}},{"name":"Set Calendar FileName","type":"SetVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"CalendarFolderName","value":{"value":"@concat('calendar_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate)","type":"Expression"}}},{"name":"Convert StorageUrl to Abfss Path","type":"SetVariable","dependsOn":[{"activity":"If GetCalendarData is true","dependencyConditions":["Succeeded"]},{"activity":"If GetTeamsChatData is true","dependencyConditions":["Succeeded"]},{"activity":"If GetEmailData is true","dependencyConditions":["Succeeded"]},{"activity":"If GetUserData is true","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"variableName":"StorageAbfssPath","value":{"value":"@concat(replace(replace(trim(pipeline().parameters.StorageUrl), 'https://', 'abfss://mgdc@'),'dfs.core.windows.net/','dfs.core.windows.net'),'/')\n","type":"Expression"}}}],"policy":{"elapsedTimeMetric":{},"cancelAfter":{}},"parameters":{"StartDate":{"type":"string"},"EndDate":{"type":"string"},"StorageUrl":{"type":"string","defaultValue":"https://.dfs.core.windows.net/"},"GetUserData":{"type":"bool","defaultValue":true},"GetEmailData":{"type":"bool","defaultValue":true},"GetCalendarData":{"type":"bool","defaultValue":true},"GetTeamsChatData":{"type":"bool","defaultValue":true}},"variables":{"CalendarFolderName":{"type":"String"},"EmailFolderName":{"type":"String"},"TeamsChatFolderName":{"type":"String"},"UserFolderName":{"type":"String"},"StorageAbfssPath":{"type":"String"}},"annotations":[],"lastPublishTime":"2023-03-15T07:49:35Z"},"dependsOn":["[concat(variables('workspaceId'), '/notebooks/ONA')]","[concat(variables('workspaceId'), '/bigDataPools/onasynapsepool')]","[concat(variables('workspaceId'), '/datasets/Microsoft365UserTable')]","[concat(variables('workspaceId'), '/datasets/UserSink')]","[concat(variables('workspaceId'), '/datasets/Microsoft365EmailTable')]","[concat(variables('workspaceId'), '/datasets/EmailSink')]","[concat(variables('workspaceId'), '/datasets/Microsoft365TeamsChatTable')]","[concat(variables('workspaceId'), '/datasets/TeamsChatSink')]","[concat(variables('workspaceId'), '/datasets/Microsoft365CalendarTable')]","[concat(variables('workspaceId'), '/datasets/CalendarSink')]"]},{"name":"[concat(parameters('workspaceName'), '/ONA')]","type":"Microsoft.Synapse/workspaces/notebooks","apiVersion":"2019-06-01-preview","properties":{"nbformat":4,"nbformat_minor":2,"bigDataPool":{"referenceName":"onasynapsepool","type":"BigDataPoolReference"},"sessionProperties":{"driverMemory":"28g","driverCores":4,"executorMemory":"28g","executorCores":4,"numExecutors":2,"runAsWorkspaceSystemIdentity":false,"conf":{"spark.dynamicAllocation.enabled":"false","spark.dynamicAllocation.minExecutors":"2","spark.dynamicAllocation.maxExecutors":"2","spark.autotune.trackingId":"52f8a9ae-5289-4bec-9d36-d296b6909cef"}},"metadata":{"saveOutput":true,"synapse_widget":{"version":"0.1","state":{}},"enableDebugMode":false,"kernelspec":{"name":"synapse_pyspark","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"a365ComputeOptions":{"id":"/subscriptions/30a81c99-6121-40ba-99d7-ac674961cd7e/resourceGroups/rg-mgdc-ona/providers/Microsoft.Synapse/workspaces/ona-synapse/bigDataPools/onasynapsepool","name":"onasynapsepool","type":"Spark","endpoint":"https://ona-synapse.dev.azuresynapse.net/livyApi/versions/2019-11-01-preview/sparkPools/onasynapsepool","auth":{"type":"AAD","authResource":"https://dev.azuresynapse.net"},"sparkVersion":"3.2","nodeCount":3,"cores":4,"memory":28,"automaticScaleJobs":false},"sessionKeepAliveTimeout":30},"cells":[{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"tags":["parameters"]},"source":["# Default parameters that can be freely changed or overriden by pipeline run \r\n","\r\n","# Inputs\r\n","calendarPath = \"abfss://mgdc@onastorage.dfs.core.windows.net/calendar_2022-06-01_to_2022-11-07/\"\r\n","emailPath = \"abfss://mgdc@onastorage.dfs.core.windows.net/email_2022-06-01_to_2022-11-07/\"\r\n","teamsChatPath = \"abfss://mgdc@onastorage.dfs.core.windows.net/teamschat_2022-06-01_to_2022-11-07/\"\r\n","userPath = \"abfss://mgdc@onastorage.dfs.core.windows.net/user_2022-06-01_to_2022-11-07/\"\r\n","\r\n","#Output Format: Can be csv or parquet\r\n","outputFormat = \"csv\"\r\n","\r\n","# Output path of user vertices\r\n","usersOutputPath = \"abfss://output@onastorage.dfs.core.windows.net/users_2022-06-01_to_2022-11-07_weighted.csv\"\r\n","\r\n","# Output path of user to user edges\r\n","interactionsOutputPath = \"abfss://output@onastorage.dfs.core.windows.net/interactions_2022-06-01_to_2022-11-07_weighted.csv\"\r\n","\r\n","# StartDate/EndDate for this run that is denormalized to users and interactions tables\r\n","period = \"2022-06-01 to 2022-11-07\"\r\n","\r\n","# Whether or not to md5 hash the input user emails\r\n","obfuscateEmails = True\r\n","\r\n","# Leiden max cluster size, the maximum possible size for a detected community\r\n","leidenMaxClusterSize = 1000"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"microsoft":{},"collapsed":false},"source":["# Load data\r\n","try:\r\n"," emailsRaw = spark.read.json(emailPath)\r\n","except (Exception) as error:\r\n"," print(error)\r\n"," raise Exception(\"Emails data not loaded, continuing without emails\")\r\n","\r\n","try:\r\n"," meetingsRaw = spark.read.json(calendarPath)\r\n","except (Exception) as error:\r\n"," print(error)\r\n"," raise Exception(\"Calendar data not loaded, continuing without meetings\")\r\n","\r\n","try:\r\n"," teamsChatsRaw = spark.read.json(teamsChatPath)\r\n","except (Exception) as error:\r\n"," print(error)\r\n"," raise Exception(\"TeamsChats data not loaded, continuing without teams messages\")\r\n","\r\n","try:\r\n"," usersRaw = spark.read.json(userPath)\r\n","except (Exception) as error:\r\n"," print(error)\r\n"," raise Exception(\"Users data not loaded. Check the file path.\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Drop duplicates\r\n","usersDedup = usersRaw.dropDuplicates([\"puser\"])\r\n","emailsDedup = emailsRaw.dropDuplicates([\"Id\"]).select(\"Sender\", \"ToRecipients\")\r\n","teamschatsDedup = teamsChatsRaw.dropDuplicates([\"Id\"]).select(\"Sender\", \"ToRecipients\")\r\n","meetingsDedup = meetingsRaw.dropDuplicates([\"Id\"]).select(\"organizer\", \"attendees\", \"start\", \"end\", \"isAllDay\", \"isCancelled\", \"isOrganizer\", \"iCalUId\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["from pyspark.sql.functions import coalesce, col, count, explode, format_number, isnull, lit, md5, rand, size, udf, unix_timestamp\r\n","import pyspark.sql.functions as F\r\n","from pyspark.sql import types as t"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Get the user email addresses and filter emails, teamschat, and meetings to only contain edges with those users\r\n","usersEmailAddresses = usersDedup.selectExpr(\"lower(mail) as id\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Explode row with one sender -> N recipients into N rows\r\n","# Filter to only keep emails with 8 or less recipients\r\n","emails = emailsDedup.where(size(col(\"ToRecipients\")) <= 8) \\\r\n"," .withColumn(\"weight\", 1.0/size(col(\"ToRecipients\"))) \\\r\n"," .select(F.lower(col(\"Sender.EmailAddress.Address\")).alias(\"sender\"), col(\"weight\"), explode(col(\"ToRecipients\")).alias(\"exploded\")) \\\r\n"," .join(usersEmailAddresses, col(\"id\") == col(\"sender\"), \"inner\").drop(\"id\") \\\r\n"," .join(usersEmailAddresses, col(\"id\") == F.lower(col(\"exploded.EmailAddress.Address\")), \"inner\").drop(\"id\") \\\r\n"," .withColumnRenamed(\"sender\", \"src\") \\\r\n"," .withColumn(\"dst\", F.lower(col(\"exploded.EmailAddress.Address\"))) \\\r\n"," .select(col(\"src\"), col(\"dst\"), col(\"weight\")) \\\r\n"," .where(col(\"src\") != col(\"dst\"))\r\n","if obfuscateEmails:\r\n"," emails = emails.withColumn(\"srcHash\", md5(col(\"src\"))) \\\r\n"," .withColumn(\"dstHash\", md5(col(\"dst\"))) \\\r\n"," .drop(\"src\", \"dst\").selectExpr(\"srcHash as src\", \"dstHash as dst\", \"weight\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Explode row with one organizer -> N attendees into N rows\r\n","# Filter to only keep meetings at least 2 and at most 9 attendees. (Number of attendees includes the organizer)\r\n","# Filter out cancelled and all day meetings\r\n","# Filter to the meeting instance belonging to the organizer's calendar\r\n","# Weight by meeting length in seconds divided by 400 (6.67 minutes) and divided by the number of recipients\r\n","\r\n","dtFormat = \"yyyy-MM-dd'T'HH:mm:ss.SSSSSSS\"\r\n","meetings = meetingsDedup.where((size(col(\"attendees\")) <= 9) & (size(col(\"attendees\")) >= 2)) \\\r\n"," .where((col(\"isAllDay\") == False) & (col(\"isCancelled\") == False) & (col(\"isOrganizer\") == True)) \\\r\n"," .withColumn(\"meetingDurationInSeconds\", unix_timestamp(col(\"end.dateTime\"), dtFormat).cast(\"long\") - unix_timestamp(col(\"start.dateTime\"), dtFormat).cast(\"long\")) \\\r\n"," .withColumn(\"weight\", (col(\"meetingDurationInSeconds\")/400.0) / (size(col(\"attendees\")) - 1)) \\\r\n"," .select(F.lower(col(\"organizer.emailAddress.address\")).alias(\"sender\"), col(\"weight\"), col(\"meetingDurationInSeconds\"), col(\"attendees\"), col(\"iCalUId\"), explode(col(\"attendees\")).alias(\"exploded\")) \\\r\n"," .join(usersEmailAddresses, col(\"id\") == col(\"sender\"), \"inner\").drop(\"id\") \\\r\n"," .join(usersEmailAddresses, col(\"id\") == F.lower(col(\"exploded.EmailAddress.Address\")), \"inner\").drop(\"id\") \\\r\n"," .withColumnRenamed(\"sender\", \"src\") \\\r\n"," .withColumn(\"dst\", F.lower(col(\"exploded.EmailAddress.Address\"))) \\\r\n"," .select(col(\"src\"), col(\"dst\"), col(\"weight\"), col(\"meetingDurationInSeconds\"), col(\"iCalUId\"), col(\"attendees\")) \\\r\n"," .where(col(\"src\") != col(\"dst\"))\r\n","if obfuscateEmails:\r\n"," meetings = meetings.withColumn(\"srcHash\", md5(col(\"src\"))) \\\r\n"," .withColumn(\"dstHash\", md5(col(\"dst\"))) \\\r\n"," .drop(\"src\", \"dst\").selectExpr(\"srcHash as src\", \"dstHash as dst\", \"weight\", \"meetingDurationInSeconds\", \"iCalUId\",\"attendees\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Explode row with one sender -> N recipients into N rows\r\n","# Filter to only keep teamschat messages with 8 or less recipients\r\n","teamschats = teamschatsDedup.where(size(col(\"ToRecipients\")) <= 8) \\\r\n"," .withColumn(\"weight\", 1.0/(8*size(col(\"ToRecipients\")))) \\\r\n"," .select(F.lower(col(\"Sender.EmailAddress.Address\")).alias(\"sender\"), col(\"weight\"), explode(col(\"ToRecipients\")).alias(\"exploded\")) \\\r\n"," .join(usersEmailAddresses, col(\"id\") == col(\"sender\"), \"inner\").drop(\"id\") \\\r\n"," .join(usersEmailAddresses, col(\"id\") == F.lower(col(\"exploded.EmailAddress.Address\")), \"inner\").drop(\"id\") \\\r\n"," .withColumnRenamed(\"sender\", \"src\") \\\r\n"," .withColumn(\"dst\", F.lower(col(\"exploded.EmailAddress.Address\"))) \\\r\n"," .select(col(\"src\"), col(\"dst\"), col(\"weight\")) \\\r\n"," .where(col(\"src\") != col(\"dst\"))\r\n","if obfuscateEmails:\r\n"," teamschats = teamschats.withColumn(\"srcHash\", md5(col(\"src\"))) \\\r\n"," .withColumn(\"dstHash\", md5(col(\"dst\"))) \\\r\n"," .drop(\"src\", \"dst\").selectExpr(\"srcHash as src\", \"dstHash as dst\", \"weight\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Join after counting and summing weights from emails, teams chats, and meetings\r\n","emailEdges = emails.groupBy(\"src\", \"dst\").agg(F.count(col(\"dst\")).alias(\"InteractionsEmail\"), F.sum(col(\"weight\")).alias(\"EmailWeight\")) \\\r\n"," .withColumnRenamed(\"src\", \"src1\").withColumnRenamed(\"dst\", \"dst1\")\r\n","\r\n","meetingEdges = meetings.groupBy(\"src\", \"dst\").agg(F.count(col(\"dst\")).alias(\"InteractionsMeetings\"), F.sum(col(\"weight\")).alias(\"MeetingsWeight\")) \\\r\n"," .withColumnRenamed(\"src\", \"src2\").withColumnRenamed(\"dst\", \"dst2\")\r\n","\r\n","teamsChatEdges = teamschats.groupBy(\"src\", \"dst\").agg(F.count(col(\"dst\")).alias(\"InteractionsTeamsChat\"), F.sum(col(\"weight\")).alias(\"TeamsChatWeight\")) \\\r\n"," .withColumnRenamed(\"src\", \"src3\").withColumnRenamed(\"dst\", \"dst3\")\r\n","\r\n","allEdges = emailEdges.alias(\"e\").join(meetingEdges.alias(\"m\"), (col(\"src1\") == col(\"src2\")) & (col(\"dst1\") == col(\"dst2\")), \"full\") \\\r\n"," .join(teamsChatEdges.alias(\"t\"), (col(\"src1\") == col(\"src3\")) & (col(\"dst1\") == col(\"dst3\")), \"full\")\r\n"," "],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Coalesce together src/dst duplicate columns after join\r\n","teamsChatToEmailRatio = 8 # interaction ratio for teamschat to email\r\n","edgesCombined = allEdges.select(\r\n"," coalesce( *[col(c) for c in [\"src1\", \"src2\", \"src3\"]]).alias(\"Source\"),\r\n"," coalesce( *[col(c) for c in [\"dst1\", \"dst2\", \"dst3\"]]).alias(\"Target\"),\r\n"," col(\"InteractionsEmail\"),\r\n"," col(\"InteractionsMeetings\"),\r\n"," col(\"InteractionsTeamsChat\"),\r\n"," col(\"EmailWeight\"),\r\n"," col(\"MeetingsWeight\"),\r\n"," col(\"TeamsChatWeight\")\r\n"," ).fillna(0) \\\r\n"," .withColumn(\"Interactions\", (col(\"InteractionsEmail\") + col(\"InteractionsMeetings\") + F.round(col(\"InteractionsTeamsChat\")/teamsChatToEmailRatio)).cast('int')) \\\r\n"," .withColumn(\"InteractionsWeight\", (col(\"EmailWeight\") + col(\"MeetingsWeight\") + col(\"TeamsChatWeight\")/teamsChatToEmailRatio)) \\\r\n"," .withColumn(\"Period\", lit(period))\r\n","\r\n","if outputFormat == \"csv\":\r\n"," edgesCombined.coalesce(1).write.option(\"header\", True).mode(\"overwrite\").csv(interactionsOutputPath)\r\n"," \r\n"," Path = sc._gateway.jvm.org.apache.hadoop.fs.Path\r\n"," # get the part file generated by spark write\r\n"," fs = Path(interactionsOutputPath).getFileSystem(sc._jsc.hadoopConfiguration())\r\n"," part_file = fs.globStatus(Path(interactionsOutputPath + \"/part*\"))[0].getPath()\r\n"," # set final target path\r\n"," target_path_interactions = interactionsOutputPath+\".\"+outputFormat\r\n"," # move and rename the file\r\n"," fs.rename(part_file, Path(target_path_interactions))\r\n"," fs.delete(Path(interactionsOutputPath), True)\r\n","elif outputFormat == \"parquet\":\r\n"," edgesCombined.write.option(\"header\", True).mode(\"overwrite\").parquet(interactionsOutputPath)\r\n","else:\r\n"," raise Exception (\"outputFormat should be csv or parquet\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["eventsOrganized = meetings.groupBy(\"src\").count().withColumnRenamed(\"count\", \"NumberOfEventsOrganized\")\r\n","eventsAttended = meetings.groupBy(\"dst\").count().withColumnRenamed(\"count\", \"NumberOfEventsAttended\")\r\n","emailsSent = emails.groupBy(\"src\").count().withColumnRenamed(\"count\", \"NumberOfEmailsSent\")\r\n","emailsReceived = emails.groupBy(\"dst\").count().withColumnRenamed(\"count\", \"NumberOfEmailsReceived\")\r\n","teamsChatsSent = teamschats.groupBy(\"src\").count().withColumnRenamed(\"count\", \"NumberOfChatsSent\")\r\n","teamsChatsReceived = teamschats.groupBy(\"dst\").count().withColumnRenamed(\"count\", \"NumberOfChatsReceived\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Select user properties for output and join all raw email/teamschat/meeting counts\r\n","if obfuscateEmails:\r\n"," usersDedup = usersDedup.withColumn(\"EmailAddress\", md5(F.lower(col(\"mail\"))))\r\n","else:\r\n"," usersDedup = usersDedup.withColumn(\"EmailAddress\", F.lower(col(\"mail\")))\r\n","usersRenamed = usersDedup.selectExpr(\"EmailAddress\", \"department as Department\", \"jobTitle as Title\", \"state as StateOrProvince\",\r\n"," \"country as Country\",\"preferredLanguage as Languages\",\"ptenant as TenantID\")\r\n","usersJoined = usersRenamed.join(eventsOrganized, col(\"src\") == col(\"EmailAddress\"), \"left\").drop(\"src\") \\\r\n"," .join(eventsAttended, col(\"dst\") == col(\"EmailAddress\"), \"left\").drop(\"dst\") \\\r\n"," .join(emailsSent, col(\"src\") == col(\"EmailAddress\"), \"left\").drop(\"src\") \\\r\n"," .join(emailsReceived, col(\"dst\") == col(\"EmailAddress\"), \"left\").drop(\"dst\") \\\r\n"," .join(teamsChatsSent, col(\"src\") == col(\"EmailAddress\"), \"left\").drop(\"src\") \\\r\n"," .join(teamsChatsReceived, col(\"dst\") == col(\"EmailAddress\"), \"left\").drop(\"dst\") \\\r\n"," .fillna(0)\r\n","numUsers = usersJoined.count()"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Calculate out-degrees and in-degrees based on number of connections\r\n","outDegreeEdges = edgesCombined.where(col(\"Interactions\") > 0).groupBy(\"Source\").count().select(col(\"Source\"), col(\"count\").alias(\"Out-DegreeIndex\"))\r\n","inDegreeEdges = edgesCombined.where(col(\"Interactions\") > 0).groupBy(\"Target\").count().select(col(\"Target\"), col(\"count\").alias(\"In-DegreeIndex\"))"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Construct networkx graph object\r\n","import networkx as nx\r\n","edges = edgesCombined.selectExpr(\"Source as src\", \"Target as dst\", \"InteractionsWeight as wgt\") \\\r\n"," .where((col(\"InteractionsWeight\") >= 0.25) & (col(\"InteractionsWeight\") <= 2000))\r\n","edgesList = [(e.src, e.dst, e.wgt) for e in edges.collect()]\r\n","graph = nx.DiGraph()\r\n","graph.add_weighted_edges_from(edgesList)"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Calculate Influence Index based on page rank\r\n","graphPageRank = nx.pagerank(graph, alpha=0.85, personalization=None, max_iter=100, tol=0.001)"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Define udf for adding page rank to dataframe\r\n","maxPageRank = max(graphPageRank.values())\r\n","def getPageRank(x):\r\n"," pageRank = graphPageRank.get(x)\r\n"," if pageRank is None:\r\n"," return 0\r\n"," return pageRank / maxPageRank\r\n","influenceIndexUdf = udf(getPageRank, t.FloatType())"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Calculate Betweeness Index\r\n","# Commented out since the complexity is O(EV) where E = edges, V = vertices\r\n","# This will be slow for larger graphs, roughly above 10K users\r\n","# graphBetweenness = nx.betweenness_centrality(graph)"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Define udf for adding betweeness to dataframe\r\n","# def getBetweeness(x):\r\n","# return graphBetweenness.get(x)\r\n","# betweenessIndexUdf = udf(getBetweeness, t.FloatType())"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Calculate Community Bridging Index based on Leiden community detection\r\n","import graspologic\r\n","from graspologic.partition import leiden\r\n","\r\n","# Constructs undirected graph using bidrectional edges only, see networkx DiGraph.to_undirected doc\r\n","undirectedGraph = graph.to_undirected()\r\n","\r\n","leidenResult = graspologic.partition.hierarchical_leiden(undirectedGraph, max_cluster_size=leidenMaxClusterSize)\r\n","leidenClusters = leidenResult.final_level_hierarchical_clustering()"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Construct udf for mapping users to community label\r\n","def getLabel(x):\r\n"," return leidenClusters.get(x)\r\n","\r\n","labelUdf = udf(getLabel, t.StringType())"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Counts how many communities C a user is connected to with an out edge, normalized by num of communities\r\n","# For all users, compute C / (num of Communities)\r\n","# 1 = they are connected to all communities\r\n","# 0 = they have no connections\r\n","\r\n","# enrich edges by mapping target dst node to community\r\n","edgesLabelled = edges.withColumn(\"Community\", labelUdf(col(\"dst\"))).drop(\"dst\").distinct()\r\n","\r\n","# group on src and count how many distinct community labelled targets each src has\r\n","numCommunities = len(set(leidenClusters.values()))\r\n","communityBridging = edgesLabelled.groupBy(\"src\").count() \\\r\n"," .withColumn(\"CommunityBridgeIndex\", col(\"count\") / float(numCommunities)).drop(\"count\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Join all indexes to users and output\r\n","usersEnriched = usersJoined.join(outDegreeEdges, col(\"Source\") == col(\"EmailAddress\"), \"left\").drop(\"Source\") \\\r\n"," .join(inDegreeEdges, col(\"Target\") == col(\"EmailAddress\"), \"left\").drop(\"Target\") \\\r\n"," .fillna(0) \\\r\n"," .withColumn(\"DegreeIndex\", (col(\"In-DegreeIndex\") + col(\"Out-DegreeIndex\")) / (2 * numUsers)) \\\r\n"," .withColumn(\"Community\", labelUdf(col(\"EmailAddress\"))) \\\r\n"," .join(communityBridging, col(\"src\") == col(\"EmailAddress\"), \"left\").drop(\"src\") \\\r\n"," .withColumn(\"InfluenceIndex\", influenceIndexUdf(col(\"EmailAddress\"))) \\\r\n"," .fillna(0) \\\r\n"," .withColumn(\"Period\", lit(period))\r\n","\r\n","if outputFormat == \"csv\":\r\n"," usersEnriched.coalesce(1).write.option(\"header\", True).mode(\"overwrite\").csv(usersOutputPath)\r\n"," \r\n"," Path = sc._gateway.jvm.org.apache.hadoop.fs.Path\r\n"," # get the part file generated by spark write\r\n"," fs = Path(usersOutputPath).getFileSystem(sc._jsc.hadoopConfiguration())\r\n"," part_file = fs.globStatus(Path(usersOutputPath + \"/part*\"))[0].getPath()\r\n"," #set final target path\r\n"," target_path_users = usersOutputPath+\".\"+outputFormat\r\n"," # move and rename the file\r\n"," fs.rename(part_file, Path(target_path_users))\r\n"," fs.delete(Path(usersOutputPath), True)\r\n","elif outputFormat == \"parquet\":\r\n"," usersEnriched.write.option(\"header\", True).mode(\"overwrite\").parquet(usersOutputPath)\r\n","else:\r\n"," raise Exception (\"outputFormat should be csv or parquet\")"],"outputs":[],"execution_count":null}]},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/onasynapsepool')]","type":"Microsoft.Synapse/workspaces/bigDataPools","apiVersion":"2019-06-01-preview","properties":{"autoPause":{"enabled":true,"delayInMinutes":15},"autoScale":{"enabled":true,"maxNodeCount":3,"minNodeCount":3},"nodeCount":3,"nodeSize":"Small","nodeSizeFamily":"MemoryOptimized","sparkVersion":"3.2","libraryRequirements":{"content":"name: ona-environment\r\nchannels:\r\n- defaults\r\ndependencies:\r\n- pip:\r\n - numpy==1.22.4\r\n - graspologic\r\n - networkx==2.6","filename":"env.yml","time":"2023-02-02T22:54:34.742901Z"},"isComputeIsolationEnabled":false,"sessionLevelPackagesEnabled":true,"annotations":[]},"dependsOn":[],"location":"eastus"},{"name":"[concat(parameters('workspaceName'), '/Microsoft365UserTable')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('Microsoft365')]","type":"LinkedServiceReference"},"annotations":[],"type":"Office365Table","schema":[],"typeProperties":{"tableName":"BasicDataSet_v0.User_v1"}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/UserSink')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('MGDCSink')]","type":"LinkedServiceReference"},"parameters":{"FolderName":{"type":"string"}},"annotations":[],"type":"Binary","typeProperties":{"location":{"type":"AzureBlobFSLocation","folderPath":{"value":"@dataset().FolderName","type":"Expression"},"fileSystem":"mgdc"}}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/Microsoft365EmailTable')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('Microsoft365')]","type":"LinkedServiceReference"},"annotations":[],"type":"Office365Table","schema":[],"typeProperties":{"tableName":"BasicDataSet_v0.Message_v1"}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/EmailSink')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('MGDCSink')]","type":"LinkedServiceReference"},"parameters":{"FolderName":{"type":"string"}},"annotations":[],"type":"Binary","typeProperties":{"location":{"type":"AzureBlobFSLocation","folderPath":{"value":"@dataset().FolderName","type":"Expression"},"fileSystem":"mgdc"}}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/Microsoft365TeamsChatTable')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('Microsoft365')]","type":"LinkedServiceReference"},"annotations":[],"type":"Office365Table","schema":[],"typeProperties":{"tableName":"BasicDataSet_v0.TeamChat_v1"}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/TeamsChatSink')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('MGDCSink')]","type":"LinkedServiceReference"},"parameters":{"FolderName":{"type":"string"}},"annotations":[],"type":"Binary","typeProperties":{"location":{"type":"AzureBlobFSLocation","folderPath":{"value":"@dataset().FolderName","type":"Expression"},"fileSystem":"mgdc"}}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/Microsoft365CalendarTable')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('Microsoft365')]","type":"LinkedServiceReference"},"annotations":[],"type":"Office365Table","schema":[],"typeProperties":{"tableName":"BasicDataSet_v0.CalendarView_v0"}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/CalendarSink')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('MGDCSink')]","type":"LinkedServiceReference"},"parameters":{"FolderName":{"type":"string"}},"annotations":[],"type":"Binary","typeProperties":{"location":{"type":"AzureBlobFSLocation","folderPath":{"value":"@dataset().FolderName","type":"Expression"},"fileSystem":"mgdc"}}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/default')]","type":"Microsoft.Synapse/workspaces/managedVirtualNetworks","apiVersion":"2019-06-01-preview","properties":{},"dependsOn":[]}]} \ No newline at end of file +{"$schema":"http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#","contentVersion":"1.0.0.0","parameters":{"workspaceName":{"type":"string","metadata":"Workspace name","defaultValue":"onasynapseljdk5xxwtgp3e"},"Microsoft365":{"type":"string"},"ADLSona":{"type":"string"}},"variables":{"workspaceId":"[concat('Microsoft.Synapse/workspaces/', parameters('workspaceName'))]"},"resources":[{"name":"[concat(parameters('workspaceName'), '/ONA')]","type":"Microsoft.Synapse/workspaces/pipelines","apiVersion":"2019-06-01-preview","properties":{"activities":[{"name":"If GetUserData is true","type":"IfCondition","dependsOn":[{"activity":"Set User FolderName","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.GetUserData","type":"Expression"},"ifTrueActivities":[{"name":"Copy User","type":"Copy","dependsOn":[],"policy":{"timeout":"1.1:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"Office365Source","outputColumns":[{"name":"country"},{"name":"department"},{"name":"id"},{"name":"jobTitle"},{"name":"mail"},{"name":"preferredLanguage"},{"name":"state"}]},"sink":{"type":"BinarySink","storeSettings":{"type":"AzureBlobFSWriteSettings"}},"enableStaging":false},"inputs":[{"referenceName":"Microsoft365UserTable2","type":"DatasetReference","parameters":{}}],"outputs":[{"referenceName":"UserSink2","type":"DatasetReference","parameters":{"FolderName":{"value":"@variables('UserFolderName')","type":"Expression"}}}]}]}},{"name":"If GetEmailData is true","type":"IfCondition","dependsOn":[{"activity":"Set Email FolderName","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.GetEmailData","type":"Expression"},"ifTrueActivities":[{"name":"Copy Email","type":"Copy","dependsOn":[],"policy":{"timeout":"1.1:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"Office365Source","dateFilterColumn":"sentDateTime","startTime":{"value":"@formatDateTime(pipeline().parameters.StartDate)","type":"Expression"},"endTime":{"value":"@formatDateTime(pipeline().parameters.EndDate)","type":"Expression"},"outputColumns":[{"name":"receivedDateTime"},{"name":"sentDateTime"},{"name":"sender"},{"name":"from"},{"name":"toRecipients"},{"name":"ccRecipients"},{"name":"bccRecipients"},{"name":"conversationId"},{"name":"isRead"},{"name":"createdDateTime"},{"name":"id"},{"name":"conversationIndex"}]},"sink":{"type":"BinarySink","storeSettings":{"type":"AzureBlobFSWriteSettings"}},"enableStaging":false},"inputs":[{"referenceName":"Microsoft365EmailTable2","type":"DatasetReference","parameters":{}}],"outputs":[{"referenceName":"EmailSink2","type":"DatasetReference","parameters":{"FolderName":{"value":"@variables('EmailFolderName')","type":"Expression"}}}]}]}},{"name":"If GetTeamsChatData is true","type":"IfCondition","dependsOn":[{"activity":"Set TeamsChat FolderName","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.GetTeamsChatData","type":"Expression"},"ifTrueActivities":[{"name":"Copy TeamsChat","type":"Copy","dependsOn":[],"policy":{"timeout":"1.1:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"Office365Source","dateFilterColumn":"SentDateTime","startTime":{"value":"@formatDateTime(pipeline().parameters.StartDate)","type":"Expression"},"endTime":{"value":"@formatDateTime(pipeline().parameters.EndDate)","type":"Expression"},"outputColumns":[{"name":"Id"},{"name":"ReceivedDateTime"},{"name":"SentDateTime"},{"name":"InternetMessageId"},{"name":"ConversationId"},{"name":"ConversationIndex"},{"name":"Sender"},{"name":"From"},{"name":"ToRecipients"}]},"sink":{"type":"BinarySink","storeSettings":{"type":"AzureBlobFSWriteSettings"}},"enableStaging":false},"inputs":[{"referenceName":"Microsoft365TeamsChatTable2","type":"DatasetReference","parameters":{}}],"outputs":[{"referenceName":"TeamsChatSink2","type":"DatasetReference","parameters":{"FolderName":{"value":"@variables('TeamsChatFolderName')","type":"Expression"}}}]}]}},{"name":"If GetCalendarData is true","type":"IfCondition","dependsOn":[{"activity":"Set Calendar FileName","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.GetCalendarData","type":"Expression"},"ifTrueActivities":[{"name":"Copy Calendar","type":"Copy","dependsOn":[],"policy":{"timeout":"1.1:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"Office365Source","startTime":{"value":"@formatDateTime(pipeline().parameters.StartDate)","type":"Expression"},"endTime":{"value":"@formatDateTime(pipeline().parameters.EndDate)","type":"Expression"},"outputColumns":[{"name":"id"},{"name":"createdDateTime"},{"name":"iCalUId"},{"name":"start"},{"name":"end"},{"name":"isAllDay"},{"name":"isCancelled"},{"name":"attendees"},{"name":"organizer"},{"name":"isOrganizer"}]},"sink":{"type":"BinarySink","storeSettings":{"type":"AzureBlobFSWriteSettings"}},"enableStaging":false},"inputs":[{"referenceName":"Microsoft365CalendarTable2","type":"DatasetReference","parameters":{}}],"outputs":[{"referenceName":"CalendarSink2","type":"DatasetReference","parameters":{"FolderName":{"value":"@variables('CalendarFolderName')","type":"Expression"}}}]}]}},{"name":"Run ONA Notebook","type":"SynapseNotebook","dependsOn":[{"activity":"Convert StorageUrl to Abfss Path","dependencyConditions":["Succeeded"]}],"policy":{"timeout":"2.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"notebook":{"referenceName":"ONA","type":"NotebookReference"},"parameters":{"calendarPath":{"value":{"value":"@concat(variables('StorageAbfssPath'), variables('CalendarFolderName'))","type":"Expression"},"type":"string"},"emailPath":{"value":{"value":"@concat(variables('StorageAbfssPath'), variables('EmailFolderName'))\n","type":"Expression"},"type":"string"},"teamsChatPath":{"value":{"value":"@concat(variables('StorageAbfssPath'), variables('TeamsChatFolderName'))","type":"Expression"},"type":"string"},"userPath":{"value":{"value":"@concat(variables('StorageAbfssPath'), variables('UserFolderName'))","type":"Expression"},"type":"string"},"outputFormat":{"value":"csv","type":"string"},"usersOutputPath":{"value":{"value":"@concat(replace(variables('StorageAbfssPath'),'mgdc@','output@'),'users_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate, '.csv')","type":"Expression"},"type":"string"},"interactionsOutputPath":{"value":{"value":"@concat(replace(variables('StorageAbfssPath'),'mgdc@','output@'),'interactions_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate, '.csv')","type":"Expression"},"type":"string"},"period":{"value":{"value":"@concat(pipeline().parameters.StartDate, ' to ', pipeline().parameters.EndDate)","type":"Expression"},"type":"string"},"obfuscateEmails":{"value":{"value":"@bool(1)","type":"Expression"},"type":"bool"},"leidenMaxClusterSize":{"value":"1000","type":"int"}},"snapshot":true,"executorSize":null,"conf":{"spark.dynamicAllocation.enabled":null,"spark.dynamicAllocation.minExecutors":null,"spark.dynamicAllocation.maxExecutors":null},"driverSize":null,"numExecutors":null}},{"name":"Set User FolderName","type":"SetVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"UserFolderName","value":{"value":"@concat('user_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate)","type":"Expression"}}},{"name":"Set Email FolderName","type":"SetVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"EmailFolderName","value":{"value":"@concat('email_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate)","type":"Expression"}}},{"name":"Set TeamsChat FolderName","type":"SetVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"TeamsChatFolderName","value":{"value":"@concat('teamschat_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate)","type":"Expression"}}},{"name":"Set Calendar FileName","type":"SetVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"CalendarFolderName","value":{"value":"@concat('calendar_', pipeline().parameters.StartDate, '_to_', pipeline().parameters.EndDate)","type":"Expression"}}},{"name":"Convert StorageUrl to Abfss Path","type":"SetVariable","dependsOn":[{"activity":"If GetCalendarData is true","dependencyConditions":["Succeeded"]},{"activity":"If GetTeamsChatData is true","dependencyConditions":["Succeeded"]},{"activity":"If GetEmailData is true","dependencyConditions":["Succeeded"]},{"activity":"If GetUserData is true","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"variableName":"StorageAbfssPath","value":{"value":"@concat(replace(replace(trim(pipeline().parameters.StorageUrl), 'https://', 'abfss://mgdc@'),'dfs.core.windows.net/','dfs.core.windows.net'),'/')\n","type":"Expression"}}}],"policy":{"elapsedTimeMetric":{},"cancelAfter":{}},"parameters":{"StartDate":{"type":"string"},"EndDate":{"type":"string"},"StorageUrl":{"type":"string","defaultValue":"https://.dfs.core.windows.net/"},"GetUserData":{"type":"bool","defaultValue":true},"GetEmailData":{"type":"bool","defaultValue":true},"GetCalendarData":{"type":"bool","defaultValue":true},"GetTeamsChatData":{"type":"bool","defaultValue":true}},"variables":{"CalendarFolderName":{"type":"String"},"EmailFolderName":{"type":"String"},"TeamsChatFolderName":{"type":"String"},"UserFolderName":{"type":"String"},"StorageAbfssPath":{"type":"String"}},"annotations":[],"lastPublishTime":"2023-03-15T07:49:35Z"},"dependsOn":["[concat(variables('workspaceId'), '/notebooks/ONA')]","[concat(variables('workspaceId'), '/datasets/Microsoft365UserTable2')]","[concat(variables('workspaceId'), '/datasets/UserSink2')]","[concat(variables('workspaceId'), '/datasets/Microsoft365EmailTable2')]","[concat(variables('workspaceId'), '/datasets/EmailSink2')]","[concat(variables('workspaceId'), '/datasets/Microsoft365TeamsChatTable2')]","[concat(variables('workspaceId'), '/datasets/TeamsChatSink2')]","[concat(variables('workspaceId'), '/datasets/Microsoft365CalendarTable2')]","[concat(variables('workspaceId'), '/datasets/CalendarSink2')]"]},{"name":"[concat(parameters('workspaceName'), '/ONA')]","type":"Microsoft.Synapse/workspaces/notebooks","apiVersion":"2019-06-01-preview","properties":{"nbformat":4,"nbformat_minor":2,"bigDataPool":{"referenceName":"onasynapsepool","type":"BigDataPoolReference"},"sessionProperties":{"driverMemory":"28g","driverCores":4,"executorMemory":"28g","executorCores":4,"numExecutors":2,"runAsWorkspaceSystemIdentity":false,"conf":{"spark.dynamicAllocation.enabled":"false","spark.dynamicAllocation.minExecutors":"2","spark.dynamicAllocation.maxExecutors":"2","spark.autotune.trackingId":"c091b50f-aaa1-40f1-b3a9-91b3445e23a8"}},"metadata":{"saveOutput":true,"synapse_widget":{"version":"0.1","state":{}},"enableDebugMode":false,"kernelspec":{"name":"synapse_pyspark","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"a365ComputeOptions":{"id":"/subscriptions/30a81c99-6121-40ba-99d7-ac674961cd7e/resourceGroups/ONA_demo_RG_yashah/providers/Microsoft.Synapse/workspaces/onasynapseljdk5xxwtgp3e/bigDataPools/onasynapsepool","name":"onasynapsepool","type":"Spark","endpoint":"https://onasynapseljdk5xxwtgp3e.dev.azuresynapse.net/livyApi/versions/2019-11-01-preview/sparkPools/onasynapsepool","auth":{"type":"AAD","authResource":"https://dev.azuresynapse.net"},"sparkVersion":"3.1","nodeCount":3,"cores":4,"memory":28,"automaticScaleJobs":false},"sessionKeepAliveTimeout":30},"cells":[{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"tags":["parameters"]},"source":["# Default parameters that can be freely changed or overriden by pipeline run \r\n","\r\n","# Inputs\r\n","calendarPath = \"abfss://mgdc@onastorage.dfs.core.windows.net/calendar_2022-06-01_to_2022-11-07/\"\r\n","emailPath = \"abfss://mgdc@onastorage.dfs.core.windows.net/email_2022-06-01_to_2022-11-07/\"\r\n","teamsChatPath = \"abfss://mgdc@onastorage.dfs.core.windows.net/teamschat_2022-06-01_to_2022-11-07/\"\r\n","userPath = \"abfss://mgdc@onastorage.dfs.core.windows.net/user_2022-06-01_to_2022-11-07/\"\r\n","\r\n","#Output Format: Can be csv or parquet\r\n","outputFormat = \"csv\"\r\n","\r\n","# Output path of user vertices\r\n","usersOutputPath = \"abfss://output@onastorage.dfs.core.windows.net/users_2022-06-01_to_2022-11-01_weighted.csv\"\r\n","\r\n","# Output path of user to user edges\r\n","interactionsOutputPath = \"abfss://output@onastorage.dfs.core.windows.net/interactions_2022-06-01_to_2022-11-01_weighted.csv\"\r\n","\r\n","# StartDate/EndDate for this run that is denormalized to users and interactions tables\r\n","period = \"2022-06-01 to 2022-11-07\"\r\n","\r\n","# Whether or not to md5 hash the input user emails\r\n","obfuscateEmails = True\r\n","\r\n","# Leiden max cluster size, the maximum possible size for a detected community\r\n","leidenMaxClusterSize = 1000"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"microsoft":{},"collapsed":false},"source":["# Load data\r\n","try:\r\n"," emailsRaw = spark.read.json(emailPath)\r\n","except (Exception) as error:\r\n"," print(error)\r\n"," raise Exception(\"Emails data not loaded, continuing without emails\")\r\n","\r\n","try:\r\n"," meetingsRaw = spark.read.json(calendarPath)\r\n","except (Exception) as error:\r\n"," print(error)\r\n"," raise Exception(\"Calendar data not loaded, continuing without meetings\")\r\n","\r\n","try:\r\n"," teamsChatsRaw = spark.read.json(teamsChatPath)\r\n","except (Exception) as error:\r\n"," print(error)\r\n"," raise Exception(\"TeamsChats data not loaded, continuing without teams messages\")\r\n","\r\n","try:\r\n"," usersRaw = spark.read.json(userPath)\r\n","except (Exception) as error:\r\n"," print(error)\r\n"," raise Exception(\"Users data not loaded. Check the file path.\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Drop duplicates\r\n","usersDedup = usersRaw.dropDuplicates([\"puser\"])\r\n","emailsDedup = emailsRaw.dropDuplicates([\"Id\"]).select(\"Sender\", \"ToRecipients\")\r\n","teamschatsDedup = teamsChatsRaw.dropDuplicates([\"Id\"]).select(\"Sender\", \"ToRecipients\")\r\n","meetingsDedup = meetingsRaw.dropDuplicates([\"Id\"]).select(\"organizer\", \"attendees\", \"start\", \"end\", \"isAllDay\", \"isCancelled\", \"isOrganizer\", \"iCalUId\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["from pyspark.sql.functions import coalesce, col, count, explode, format_number, isnull, lit, md5, rand, size, udf, unix_timestamp\r\n","import pyspark.sql.functions as F\r\n","from pyspark.sql import types as t"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Get the user email addresses and filter emails, teamschat, and meetings to only contain edges with those users\r\n","usersEmailAddresses = usersDedup.selectExpr(\"lower(mail) as id\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Explode row with one sender -> N recipients into N rows\r\n","# Filter to only keep emails with 8 or less recipients\r\n","emails = emailsDedup.where(size(col(\"ToRecipients\")) <= 8) \\\r\n"," .withColumn(\"weight\", 1.0/size(col(\"ToRecipients\"))) \\\r\n"," .select(F.lower(col(\"Sender.EmailAddress.Address\")).alias(\"sender\"), col(\"weight\"), explode(col(\"ToRecipients\")).alias(\"exploded\")) \\\r\n"," .join(usersEmailAddresses, col(\"id\") == col(\"sender\"), \"inner\").drop(\"id\") \\\r\n"," .join(usersEmailAddresses, col(\"id\") == F.lower(col(\"exploded.EmailAddress.Address\")), \"inner\").drop(\"id\") \\\r\n"," .withColumnRenamed(\"sender\", \"src\") \\\r\n"," .withColumn(\"dst\", F.lower(col(\"exploded.EmailAddress.Address\"))) \\\r\n"," .select(col(\"src\"), col(\"dst\"), col(\"weight\")) \\\r\n"," .where(col(\"src\") != col(\"dst\"))\r\n","if obfuscateEmails:\r\n"," emails = emails.withColumn(\"srcHash\", md5(col(\"src\"))) \\\r\n"," .withColumn(\"dstHash\", md5(col(\"dst\"))) \\\r\n"," .drop(\"src\", \"dst\").selectExpr(\"srcHash as src\", \"dstHash as dst\", \"weight\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Explode row with one organizer -> N attendees into N rows\r\n","# Filter to only keep meetings at least 2 and at most 9 attendees. (Number of attendees includes the organizer)\r\n","# Filter out cancelled and all day meetings\r\n","# Filter to the meeting instance belonging to the organizer's calendar\r\n","# Weight by meeting length in seconds divided by 400 (6.67 minutes) and divided by the number of recipients\r\n","\r\n","dtFormat = \"yyyy-MM-dd'T'HH:mm:ss.SSSSSSS\"\r\n","meetings = meetingsDedup.where((size(col(\"attendees\")) <= 9) & (size(col(\"attendees\")) >= 2)) \\\r\n"," .where((col(\"isAllDay\") == False) & (col(\"isCancelled\") == False) & (col(\"isOrganizer\") == True)) \\\r\n"," .withColumn(\"meetingDurationInSeconds\", unix_timestamp(col(\"end.dateTime\"), dtFormat).cast(\"long\") - unix_timestamp(col(\"start.dateTime\"), dtFormat).cast(\"long\")) \\\r\n"," .withColumn(\"weight\", (col(\"meetingDurationInSeconds\")/400.0) / (size(col(\"attendees\")) - 1)) \\\r\n"," .select(F.lower(col(\"organizer.emailAddress.address\")).alias(\"sender\"), col(\"weight\"), col(\"meetingDurationInSeconds\"), col(\"attendees\"), col(\"iCalUId\"), explode(col(\"attendees\")).alias(\"exploded\")) \\\r\n"," .join(usersEmailAddresses, col(\"id\") == col(\"sender\"), \"inner\").drop(\"id\") \\\r\n"," .join(usersEmailAddresses, col(\"id\") == F.lower(col(\"exploded.EmailAddress.Address\")), \"inner\").drop(\"id\") \\\r\n"," .withColumnRenamed(\"sender\", \"src\") \\\r\n"," .withColumn(\"dst\", F.lower(col(\"exploded.EmailAddress.Address\"))) \\\r\n"," .select(col(\"src\"), col(\"dst\"), col(\"weight\"), col(\"meetingDurationInSeconds\"), col(\"iCalUId\"), col(\"attendees\")) \\\r\n"," .where(col(\"src\") != col(\"dst\"))\r\n","if obfuscateEmails:\r\n"," meetings = meetings.withColumn(\"srcHash\", md5(col(\"src\"))) \\\r\n"," .withColumn(\"dstHash\", md5(col(\"dst\"))) \\\r\n"," .drop(\"src\", \"dst\").selectExpr(\"srcHash as src\", \"dstHash as dst\", \"weight\", \"meetingDurationInSeconds\", \"iCalUId\",\"attendees\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Explode row with one sender -> N recipients into N rows\r\n","# Filter to only keep teamschat messages with 8 or less recipients\r\n","teamschats = teamschatsDedup.where(size(col(\"ToRecipients\")) <= 8) \\\r\n"," .withColumn(\"weight\", 1.0/(8*size(col(\"ToRecipients\")))) \\\r\n"," .select(F.lower(col(\"Sender.EmailAddress.Address\")).alias(\"sender\"), col(\"weight\"), explode(col(\"ToRecipients\")).alias(\"exploded\")) \\\r\n"," .join(usersEmailAddresses, col(\"id\") == col(\"sender\"), \"inner\").drop(\"id\") \\\r\n"," .join(usersEmailAddresses, col(\"id\") == F.lower(col(\"exploded.EmailAddress.Address\")), \"inner\").drop(\"id\") \\\r\n"," .withColumnRenamed(\"sender\", \"src\") \\\r\n"," .withColumn(\"dst\", F.lower(col(\"exploded.EmailAddress.Address\"))) \\\r\n"," .select(col(\"src\"), col(\"dst\"), col(\"weight\")) \\\r\n"," .where(col(\"src\") != col(\"dst\"))\r\n","if obfuscateEmails:\r\n"," teamschats = teamschats.withColumn(\"srcHash\", md5(col(\"src\"))) \\\r\n"," .withColumn(\"dstHash\", md5(col(\"dst\"))) \\\r\n"," .drop(\"src\", \"dst\").selectExpr(\"srcHash as src\", \"dstHash as dst\", \"weight\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Join after counting and summing weights from emails, teams chats, and meetings\r\n","emailEdges = emails.groupBy(\"src\", \"dst\").agg(F.count(col(\"dst\")).alias(\"InteractionsEmail\"), F.sum(col(\"weight\")).alias(\"EmailWeight\")) \\\r\n"," .withColumnRenamed(\"src\", \"src1\").withColumnRenamed(\"dst\", \"dst1\")\r\n","\r\n","meetingEdges = meetings.groupBy(\"src\", \"dst\").agg(F.count(col(\"dst\")).alias(\"InteractionsMeetings\"), F.sum(col(\"weight\")).alias(\"MeetingsWeight\")) \\\r\n"," .withColumnRenamed(\"src\", \"src2\").withColumnRenamed(\"dst\", \"dst2\")\r\n","\r\n","teamsChatEdges = teamschats.groupBy(\"src\", \"dst\").agg(F.count(col(\"dst\")).alias(\"InteractionsTeamsChat\"), F.sum(col(\"weight\")).alias(\"TeamsChatWeight\")) \\\r\n"," .withColumnRenamed(\"src\", \"src3\").withColumnRenamed(\"dst\", \"dst3\")\r\n","\r\n","allEdges = emailEdges.alias(\"e\").join(meetingEdges.alias(\"m\"), (col(\"src1\") == col(\"src2\")) & (col(\"dst1\") == col(\"dst2\")), \"full\") \\\r\n"," .join(teamsChatEdges.alias(\"t\"), (col(\"src1\") == col(\"src3\")) & (col(\"dst1\") == col(\"dst3\")), \"full\")\r\n"," "],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Coalesce together src/dst duplicate columns after join\r\n","teamsChatToEmailRatio = 8 # interaction ratio for teamschat to email\r\n","edgesCombined = allEdges.select(\r\n"," coalesce( *[col(c) for c in [\"src1\", \"src2\", \"src3\"]]).alias(\"Source\"),\r\n"," coalesce( *[col(c) for c in [\"dst1\", \"dst2\", \"dst3\"]]).alias(\"Target\"),\r\n"," col(\"InteractionsEmail\"),\r\n"," col(\"InteractionsMeetings\"),\r\n"," col(\"InteractionsTeamsChat\"),\r\n"," col(\"EmailWeight\"),\r\n"," col(\"MeetingsWeight\"),\r\n"," col(\"TeamsChatWeight\")\r\n"," ).fillna(0) \\\r\n"," .withColumn(\"Interactions\", (col(\"InteractionsEmail\") + col(\"InteractionsMeetings\") + F.round(col(\"InteractionsTeamsChat\")/teamsChatToEmailRatio)).cast('int')) \\\r\n"," .withColumn(\"InteractionsWeight\", (col(\"EmailWeight\") + col(\"MeetingsWeight\") + col(\"TeamsChatWeight\")/teamsChatToEmailRatio)) \\\r\n"," .withColumn(\"Period\", lit(period))\r\n","\r\n","if outputFormat == \"csv\":\r\n"," edgesCombined.coalesce(1).write.option(\"header\", True).mode(\"overwrite\").csv(interactionsOutputPath)\r\n"," \r\n"," Path = sc._gateway.jvm.org.apache.hadoop.fs.Path\r\n"," # get the part file generated by spark write\r\n"," fs = Path(interactionsOutputPath).getFileSystem(sc._jsc.hadoopConfiguration())\r\n"," part_file = fs.globStatus(Path(interactionsOutputPath + \"/part*\"))[0].getPath()\r\n"," # set final target path\r\n"," target_path_interactions = interactionsOutputPath+\".\"+outputFormat\r\n"," # move and rename the file\r\n"," fs.rename(part_file, Path(target_path_interactions))\r\n"," fs.delete(Path(interactionsOutputPath), True)\r\n","elif outputFormat == \"parquet\":\r\n"," edgesCombined.write.option(\"header\", True).mode(\"overwrite\").parquet(interactionsOutputPath)\r\n","else:\r\n"," raise Exception (\"outputFormat should be csv or parquet\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["eventsOrganized = meetings.groupBy(\"src\").count().withColumnRenamed(\"count\", \"NumberOfEventsOrganized\")\r\n","eventsAttended = meetings.groupBy(\"dst\").count().withColumnRenamed(\"count\", \"NumberOfEventsAttended\")\r\n","emailsSent = emails.groupBy(\"src\").count().withColumnRenamed(\"count\", \"NumberOfEmailsSent\")\r\n","emailsReceived = emails.groupBy(\"dst\").count().withColumnRenamed(\"count\", \"NumberOfEmailsReceived\")\r\n","teamsChatsSent = teamschats.groupBy(\"src\").count().withColumnRenamed(\"count\", \"NumberOfChatsSent\")\r\n","teamsChatsReceived = teamschats.groupBy(\"dst\").count().withColumnRenamed(\"count\", \"NumberOfChatsReceived\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Select user properties for output and join all raw email/teamschat/meeting counts\r\n","if obfuscateEmails:\r\n"," usersDedup = usersDedup.withColumn(\"EmailAddress\", md5(F.lower(col(\"mail\"))))\r\n","else:\r\n"," usersDedup = usersDedup.withColumn(\"EmailAddress\", F.lower(col(\"mail\")))\r\n","usersRenamed = usersDedup.selectExpr(\"EmailAddress\", \"department as Department\", \"jobTitle as Title\", \"state as StateOrProvince\",\r\n"," \"country as Country\",\"preferredLanguage as Languages\",\"ptenant as TenantID\")\r\n","usersJoined = usersRenamed.join(eventsOrganized, col(\"src\") == col(\"EmailAddress\"), \"left\").drop(\"src\") \\\r\n"," .join(eventsAttended, col(\"dst\") == col(\"EmailAddress\"), \"left\").drop(\"dst\") \\\r\n"," .join(emailsSent, col(\"src\") == col(\"EmailAddress\"), \"left\").drop(\"src\") \\\r\n"," .join(emailsReceived, col(\"dst\") == col(\"EmailAddress\"), \"left\").drop(\"dst\") \\\r\n"," .join(teamsChatsSent, col(\"src\") == col(\"EmailAddress\"), \"left\").drop(\"src\") \\\r\n"," .join(teamsChatsReceived, col(\"dst\") == col(\"EmailAddress\"), \"left\").drop(\"dst\") \\\r\n"," .fillna(0)\r\n","numUsers = usersJoined.count()"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Calculate out-degrees and in-degrees based on number of connections\r\n","outDegreeEdges = edgesCombined.where(col(\"Interactions\") > 0).groupBy(\"Source\").count().select(col(\"Source\"), col(\"count\").alias(\"Out-DegreeIndex\"))\r\n","inDegreeEdges = edgesCombined.where(col(\"Interactions\") > 0).groupBy(\"Target\").count().select(col(\"Target\"), col(\"count\").alias(\"In-DegreeIndex\"))"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Construct networkx graph object\r\n","import networkx as nx\r\n","edges = edgesCombined.selectExpr(\"Source as src\", \"Target as dst\", \"InteractionsWeight as wgt\") \\\r\n"," .where((col(\"InteractionsWeight\") >= 0.25) & (col(\"InteractionsWeight\") <= 2000))\r\n","edgesList = [(e.src, e.dst, e.wgt) for e in edges.collect()]\r\n","graph = nx.DiGraph()\r\n","graph.add_weighted_edges_from(edgesList)"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Calculate Influence Index based on page rank\r\n","graphPageRank = nx.pagerank(graph, alpha=0.85, personalization=None, max_iter=100, tol=0.001)"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Define udf for adding page rank to dataframe\r\n","maxPageRank = max(graphPageRank.values())\r\n","def getPageRank(x):\r\n"," pageRank = graphPageRank.get(x)\r\n"," if pageRank is None:\r\n"," return 0\r\n"," return pageRank / maxPageRank\r\n","influenceIndexUdf = udf(getPageRank, t.FloatType())"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Calculate Betweeness Index\r\n","# Commented out since the complexity is O(EV) where E = edges, V = vertices\r\n","# This will be slow for larger graphs, roughly above 10K users\r\n","# graphBetweenness = nx.betweenness_centrality(graph)"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Define udf for adding betweeness to dataframe\r\n","# def getBetweeness(x):\r\n","# return graphBetweenness.get(x)\r\n","# betweenessIndexUdf = udf(getBetweeness, t.FloatType())"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Calculate Community Bridging Index based on Leiden community detection\r\n","import graspologic\r\n","from graspologic.partition import leiden\r\n","\r\n","# Constructs undirected graph using bidrectional edges only, see networkx DiGraph.to_undirected doc\r\n","undirectedGraph = graph.to_undirected()\r\n","\r\n","leidenResult = graspologic.partition.hierarchical_leiden(undirectedGraph, max_cluster_size=leidenMaxClusterSize)\r\n","leidenClusters = leidenResult.final_level_hierarchical_clustering()"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Construct udf for mapping users to community label\r\n","def getLabel(x):\r\n"," return leidenClusters.get(x)\r\n","\r\n","labelUdf = udf(getLabel, t.StringType())"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}}},"source":["# Counts how many communities C a user is connected to with an out edge, normalized by num of communities\r\n","# For all users, compute C / (num of Communities)\r\n","# 1 = they are connected to all communities\r\n","# 0 = they have no connections\r\n","\r\n","# enrich edges by mapping target dst node to community\r\n","edgesLabelled = edges.withColumn(\"Community\", labelUdf(col(\"dst\"))).drop(\"dst\").distinct()\r\n","\r\n","# group on src and count how many distinct community labelled targets each src has\r\n","numCommunities = len(set(leidenClusters.values()))\r\n","communityBridging = edgesLabelled.groupBy(\"src\").count() \\\r\n"," .withColumn(\"CommunityBridgeIndex\", col(\"count\") / float(numCommunities)).drop(\"count\")"],"outputs":[],"execution_count":null},{"cell_type":"code","metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"collapsed":false},"source":["# Join all indexes to users and output\r\n","usersEnriched = usersJoined.join(outDegreeEdges, col(\"Source\") == col(\"EmailAddress\"), \"left\").drop(\"Source\") \\\r\n"," .join(inDegreeEdges, col(\"Target\") == col(\"EmailAddress\"), \"left\").drop(\"Target\") \\\r\n"," .fillna(0) \\\r\n"," .withColumn(\"DegreeIndex\", (col(\"In-DegreeIndex\") + col(\"Out-DegreeIndex\")) / (2 * numUsers)) \\\r\n"," .withColumn(\"Community\", labelUdf(col(\"EmailAddress\"))) \\\r\n"," .join(communityBridging, col(\"src\") == col(\"EmailAddress\"), \"left\").drop(\"src\") \\\r\n"," .withColumn(\"InfluenceIndex\", influenceIndexUdf(col(\"EmailAddress\"))) \\\r\n"," .fillna(0) \\\r\n"," .withColumn(\"Period\", lit(period))\r\n","\r\n","if outputFormat == \"csv\":\r\n"," usersEnriched.coalesce(1).write.option(\"header\", True).mode(\"overwrite\").csv(usersOutputPath)\r\n"," \r\n"," Path = sc._gateway.jvm.org.apache.hadoop.fs.Path\r\n"," # get the part file generated by spark write\r\n"," fs = Path(usersOutputPath).getFileSystem(sc._jsc.hadoopConfiguration())\r\n"," part_file = fs.globStatus(Path(usersOutputPath + \"/part*\"))[0].getPath()\r\n"," #set final target path\r\n"," target_path_users = usersOutputPath+\".\"+outputFormat\r\n"," # move and rename the file\r\n"," fs.rename(part_file, Path(target_path_users))\r\n"," fs.delete(Path(usersOutputPath), True)\r\n","elif outputFormat == \"parquet\":\r\n"," usersEnriched.write.option(\"header\", True).mode(\"overwrite\").parquet(usersOutputPath)\r\n","else:\r\n"," raise Exception (\"outputFormat should be csv or parquet\")"],"outputs":[],"execution_count":null}]},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/Microsoft365UserTable2')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('Microsoft365')]","type":"LinkedServiceReference"},"annotations":[],"type":"Office365Table","schema":[],"typeProperties":{"tableName":"BasicDataSet_v0.User_v1"}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/UserSink2')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('ADLSona')]","type":"LinkedServiceReference"},"parameters":{"FolderName":{"type":"string"}},"annotations":[],"type":"Binary","typeProperties":{"location":{"type":"AzureBlobFSLocation","folderPath":{"value":"@dataset().FolderName","type":"Expression"},"fileSystem":"mgdc"}}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/Microsoft365EmailTable2')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('Microsoft365')]","type":"LinkedServiceReference"},"annotations":[],"type":"Office365Table","schema":[],"typeProperties":{"tableName":"BasicDataSet_v0.Message_v1"}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/EmailSink2')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('ADLSona')]","type":"LinkedServiceReference"},"parameters":{"FolderName":{"type":"string"}},"annotations":[],"type":"Binary","typeProperties":{"location":{"type":"AzureBlobFSLocation","folderPath":{"value":"@dataset().FolderName","type":"Expression"},"fileSystem":"mgdc"}}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/Microsoft365TeamsChatTable2')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('Microsoft365')]","type":"LinkedServiceReference"},"annotations":[],"type":"Office365Table","schema":[],"typeProperties":{"tableName":"BasicDataSet_v0.TeamChat_v1"}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/TeamsChatSink2')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('ADLSona')]","type":"LinkedServiceReference"},"parameters":{"FolderName":{"type":"string"}},"annotations":[],"type":"Binary","typeProperties":{"location":{"type":"AzureBlobFSLocation","folderPath":{"value":"@dataset().FolderName","type":"Expression"},"fileSystem":"mgdc"}}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/Microsoft365CalendarTable2')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('Microsoft365')]","type":"LinkedServiceReference"},"annotations":[],"type":"Office365Table","schema":[],"typeProperties":{"tableName":"BasicDataSet_v0.CalendarView_v0"}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/CalendarSink2')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('ADLSona')]","type":"LinkedServiceReference"},"parameters":{"FolderName":{"type":"string"}},"annotations":[],"type":"Binary","typeProperties":{"location":{"type":"AzureBlobFSLocation","folderPath":{"value":"@dataset().FolderName","type":"Expression"},"fileSystem":"mgdc"}}},"dependsOn":[]}]} \ No newline at end of file